/*
 * Copyright (c) 2014-2022 Monix Contributors.
 * See the project homepage at: https://monix.io
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package monix.reactive

import java.io.{ BufferedReader, InputStream, PrintStream, Reader }

import cats.{
  ~>,
  Alternative,
  Applicative,
  Apply,
  CoflatMap,
  Eq,
  FlatMap,
  Functor,
  FunctorFilter,
  Monoid,
  NonEmptyParallel,
  Order
}
import cats.effect.{ Bracket, Effect, ExitCase, Resource }
import monix.eval.{ Coeval, Task, TaskLift, TaskLike }
import monix.eval.Task.defaultOptions
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.ChannelType.MultiProducer
import monix.execution._
import monix.execution.annotations.{ UnsafeBecauseImpure, UnsafeProtocol }
import monix.execution.cancelables.{ BooleanCancelable, SingleAssignCancelable }
import monix.execution.exceptions.{ DownstreamTimeoutException, UpstreamTimeoutException }
import monix.reactive.Observable.{ Operator, Transformer }
import monix.reactive.OverflowStrategy.Synchronous
import monix.reactive.internal.builders
import monix.reactive.internal.builders._
import monix.reactive.internal.deprecated.{ ObservableDeprecatedBuilders, ObservableDeprecatedMethods }
import monix.reactive.internal.operators._
import monix.reactive.internal.subscribers.ForeachSubscriber
import monix.reactive.observables._
import monix.reactive.observers._
import monix.reactive.subjects._
import org.reactivestreams.{ Publisher => RPublisher, Subscriber => RSubscriber }

import scala.collection.mutable
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }

/** The `Observable` type that implements the Reactive Pattern.
  *
  * Provides methods of subscribing to the Observable and operators
  * for combining observable sources, filtering, modifying,
  * throttling, buffering, error handling and others.
  *
  * See the available documentation at: [[https://monix.io]]
  *
  * @define concatMergeDifference ==Concat vs Merge==
  *
  *         The difference between the [[Observable!.concat concat]]
  *         operation and [[Observable!.merge merge]] is that `concat`
  *         cares about the ordering of sequences (e.g. all items
  *         emitted by the first observable in the sequence will come
  *         before the elements emitted by the second observable),
  *         whereas `merge` doesn't care about that (elements get
  *         emitted as they come). Because of back-pressure applied to
  *         observables, `concat` is safe to use in all contexts,
  *         whereas `merge` requires buffering. Or in other words
  *         `concat` has deterministic, lawful behavior (being the
  *         "monadic bind"), whereas `merge` has non-deterministic
  *         behavior.
  *
  * @define concatDescription Concatenates the sequence of observables
  *         emitted by the source into one observable, without any
  *         transformation.
  *
  *         You can combine the items emitted by multiple observables
  *         so that they act like a single sequence by using this
  *         operator.
  *
  *         This operation is the "monadic bind", implementing the
  *         `flatMap` operation of [[cats.Monad]].
  *
  *         $concatMergeDifference
  *
  * @define delayErrorsDescription ==Delaying Errors==
  *
  *         This version is reserving `onError` notifications until
  *         all of the observables complete and only then passing the
  *         issued errors(s) downstream. Note that the streamed error is a
  *         [[monix.execution.exceptions.CompositeException CompositeException]],
  *         since multiple errors from multiple streams can happen.
  *
  * @define concatReturn an observable that emits the merged events of all
  *         streams created by the source
  *
  * @define switchMapDescription Returns a new observable that emits the items
  *         emitted by the observable most recently generated by the
  *         mapping function.
  *
  * @define overflowStrategyParam the [[OverflowStrategy overflow strategy]]
  *         used for buffering, which specifies what to do in case
  *         we're dealing with a slow consumer - should an unbounded
  *         buffer be used, should back-pressure be applied, should
  *         the pipeline drop newer or older events, should it drop
  *         the whole buffer? See [[OverflowStrategy]] for more
  *         details.
  *
  * @define defaultOverflowStrategy this operation needs to do buffering
  *         and by not specifying an [[OverflowStrategy]], the
  *         [[OverflowStrategy.Default default strategy]] is being
  *         used.
  *
  * @define mergeMapDescription Creates a new observable by applying a
  *         function that you supply to each item emitted by the
  *         source observable, where that function returns an
  *         observable, and then merging those resulting observable
  *         and emitting the results of this merger.
  *
  *         $concatMergeDifference
  *
  * @define mergeMapReturn an observable that emits the result of applying the
  *         transformation function to each item emitted by the source
  *         observable and merging the results of the observables
  *         obtained from this transformation.
  *
  * @define mergeDescription
  *
  * @define mergeReturn an observable containing the merged events of all
  *         streams created by the source
  *
  * @define onOverflowParam a function that is used for signaling a special
  *         event used to inform the consumers that an overflow event
  *         happened, function that receives the number of dropped
  *         events as a parameter (see [[OverflowStrategy.Evicted]])
  *
  * @define bufferWithSelectorDesc Periodically gather items emitted by
  *         an observable into bundles and emit these bundles rather than
  *         emitting the items one at a time, whenever the `selector`
  *         observable signals an event.
  *
  *         The resulting observable collects the elements of the source
  *         in a buffer and emits that buffer whenever the given `selector`
  *         observable emits an `onNext` event, when the buffer is emitted
  *         as a sequence downstream and then reset. Thus the resulting
  *         observable emits connected, non-overlapping bundles triggered
  *         by the given `selector`.
  *
  *         If `selector` terminates with an `onComplete`, then the resulting
  *         observable also terminates normally. If `selector` terminates with
  *         an `onError`, then the resulting observable also terminates with an
  *         error.
  *
  *         If the source observable completes, then the current buffer gets
  *         signaled downstream. If the source triggers an error then the
  *         current buffer is being dropped and the error gets propagated
  *         immediately.
  *
  * @define unsafeBecauseImpure '''UNSAFE WARNING''':
  *         this operation can trigger the execution of side effects, which
  *         breaks referential transparency and is thus not a pure function.
  *
  *         For FP code these functions shouldn't be called until
  *         "the end of the world", which is to say at the end of
  *         the program (for a console app), or at the end of a web
  *         request.
  *
  *         Otherwise for modifying or operating on streams, prefer
  *         its pure functions like [[publishSelector]] for sharing
  *         the data source, or [[map]] or [[flatMap]] for operating
  *         on its events. Or in case of specialized logic, prefer
  *         to suspend these side effects via
  *         [[monix.reactive.Observable.suspend Observable.suspend]].
  *         Monix also provides [[monix.eval.Task Task]] which can
  *         also be used for suspending side effects and the `Task`
  *         was built to interop well with `Observable`.
  *
  * @define unsafeSubscribe '''UNSAFE PROTOCOL:''' This function is
  *         "unsafe" to call because it does not protect the calls to
  *         the given [[Observer]] implementation and thus knowledge
  *         of the protocol is needed.
  *
  *         Prefer normal
  *         [[monix.reactive.Observable!.subscribe(subscriber* subscribe]]
  *         when consuming a stream, these unsafe subscription methods
  *         being useful when building operators and for testing
  *         purposes.
  *
  *         Normal `subscribe` protects users in these ways:
  *
  *          - it does a best effort attempt to catch and report
  *            exceptions that violate the protocol
  *          - the final `onComplete` or `onError` message is
  *            guaranteed to be signaled after the completion
  *            of the [[monix.execution.Ack acknowledgement]]
  *            received from the last `onNext`; the internal
  *            protocol doesn't require back-pressuring of
  *            this last message for performance reasons
  *
  * @define catsOrderInterop ==Cats Order and Scala Interop==
  *
  *         Monix prefers to work with [[cats.Order]] for assessing the order
  *         of elements that have an ordering defined, instead of
  *         [[scala.math.Ordering]].
  *
  *         We do this for consistency, as Monix is now building on top of Cats.
  *         This may change in the future, depending on what happens with
  *         [[https://github.com/typelevel/cats/issues/2455 typelevel/cats#2455]].
  *
  *         Building a `cats.Order` is easy to do if you already have a
  *         Scala `Ordering` instance:
  *         {{{
  *           import cats.Order
  *
  *           case class Person(name: String, age: Int)
  *
  *           // Starting from a Scala Ordering
  *           implicit val scalaOrderingForPerson: Ordering[Person] =
  *             new Ordering[Person] {
  *               def compare(x: Person, y: Person): Int =
  *                 x.age.compareTo(y.age) match {
  *                   case 0 => x.name.compareTo(y.name)
  *                   case o => o
  *                 }
  *             }
  *
  *           // Building a cats.Order from it
  *           implicit val catsOrderForPerson: Order[Person] =
  *             Order.fromOrdering
  *         }}}
  *
  *         You can also do that in reverse, so you can prefer `cats.Order`
  *         (due to Cats also exposing laws and tests for free) and build a
  *         Scala `Ordering` when needed:
  *         {{{
  *           val scalaOrdering = catsOrderForPerson.toOrdering
  *         }}}
  *
  * @define catsEqInterop ==Cats Eq and Scala Interop==
  *
  *         Monix prefers to work with [[cats.Eq]] for assessing the equality
  *         of elements that have an ordering defined, instead of
  *         [[scala.math.Equiv]].
  *
  *         We do this because Scala's `Equiv` has a default instance defined
  *         that's based on universal equality and that's a big problem, because
  *         when using the `Eq` type class, it is universal equality that we
  *         want to avoid and there have been countless of bugs in the ecosystem
  *         related to both universal equality and `Equiv`. Thankfully people
  *         are working to fix it.
  *
  *         We also do this for consistency, as Monix is now building on top of
  *         Cats. This may change in the future, depending on what happens with
  *         [[https://github.com/typelevel/cats/issues/2455 typelevel/cats#2455]].
  *
  *         Defining `Eq` instance is easy and we can use universal equality
  *         in our definitions as well:
  *         {{{
  *           import cats.Eq
  *
  *           case class Address(host: String, port: Int)
  *
  *           implicit val eqForAddress: Eq[Address] =
  *             Eq.fromUniversalEquals
  *         }}}
  */
abstract class Observable[+A] extends Serializable { self =>

  // -----------------------------------------------------------------------
  // Impure operations (that break referential transparency) ...

  /** Characteristic function for an `Observable` instance, that creates
    * the subscription and that eventually starts the streaming of
    * events to the given [[Observer]], to be provided by observable
    * implementations.
    *
    * $unsafeSubscribe
    *
    * $unsafeBecauseImpure
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable

  /** Given an [[monix.reactive.Observer observer]] and a
    * [[monix.execution.Scheduler scheduler]] for managing async
    * boundaries, subscribes to this observable for events.
    *
    * Helper for calling the
    * [[Observable.unsafeSubscribeFn(subscriber* abstract method]].
    *
    * $unsafeSubscribe
    *
    * $unsafeBecauseImpure
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  final def unsafeSubscribeFn(observer: Observer[A])(implicit s: Scheduler): Cancelable =
    unsafeSubscribeFn(Subscriber(observer, s))

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe(observer: Observer[A])(implicit s: Scheduler): Cancelable =
    subscribe(Subscriber(observer, s))

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe(subscriber: Subscriber[A]): Cancelable =
    unsafeSubscribeFn(SafeSubscriber[A](subscriber))

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe(nextFn: A => Future[Ack], errorFn: Throwable => Unit)(implicit s: Scheduler): Cancelable =
    subscribe(nextFn, errorFn, () => ())

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe()(implicit s: Scheduler): Cancelable =
    subscribe(_ => Continue)

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe(nextFn: A => Future[Ack])(implicit s: Scheduler): Cancelable =
    subscribe(nextFn, error => s.reportFailure(error), () => ())

  /** Subscribes to the stream.
    *
    * $unsafeBecauseImpure
    *
    * @return a subscription that can be used to cancel the streaming.
    * @see [[consumeWith]] for another way of consuming observables
    */
  @UnsafeBecauseImpure
  final def subscribe(nextFn: A => Future[Ack], errorFn: Throwable => Unit, completedFn: () => Unit)(implicit
    s: Scheduler
  ): Cancelable = {

    subscribe(new Subscriber[A] {
      implicit val scheduler: Scheduler = s

      def onNext(elem: A) = nextFn(elem)
      def onComplete() = completedFn()
      def onError(ex: Throwable) = errorFn(ex)
    })
  }

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers).
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def multicast[B >: A, R](pipe: Pipe[B, R])(implicit s: Scheduler): ConnectableObservable[R] =
    ConnectableObservable.multicast(this, pipe)

  /** Returns a new Observable that multi-casts (shares) the original Observable
    * between multiple consumers.
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def share(implicit s: Scheduler): Observable[A] =
    publish.refCount

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers). The underlying subject used is a
    * [[monix.reactive.subjects.PublishSubject PublishSubject]].
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def publish(implicit s: Scheduler): ConnectableObservable[A] =
    unsafeMulticast(PublishSubject[A]())

  /** Caches the emissions from the source Observable and replays them
    * in order to any subsequent Subscribers. This operator has
    * similar behavior to [[Observable!.replay(implicit* replay]]
    * except that this auto-subscribes to the source Observable rather
    * than returning a
    * [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
    * for which you must call
    * [[monix.reactive.observables.ConnectableObservable.connect connect]]
    * to activate the subscription.
    *
    * When you call cache, it does not yet subscribe to the source
    * Observable and so does not yet begin caching items. This only
    * happens when the first Subscriber calls the resulting
    * Observable's `subscribe` method.
    *
    * Note: You sacrifice the ability to cancel the origin when you
    * use the cache operator so be careful not to use this on
    * Observables that emit an infinite or very large number of items
    * that will use up memory.
    *
    * $unsafeBecauseImpure
    *
    * @return an Observable that, when first subscribed to, caches all of its
    *         items and notifications for the benefit of subsequent subscribers
    */
  @UnsafeBecauseImpure
  final def cache: Observable[A] =
    CachedObservable.create(self)

  /** Caches the emissions from the source Observable and replays them
    * in order to any subsequent Subscribers. This operator has
    * similar behavior to [[Observable!.replay(implicit* replay]]
    * except that this auto-subscribes to the source Observable rather
    * than returning a
    * [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
    * for which you must call
    * [[monix.reactive.observables.ConnectableObservable.connect connect]]
    * to activate the subscription.
    *
    * When you call cache, it does not yet subscribe to the source
    * Observable and so does not yet begin caching items. This only
    * happens when the first Subscriber calls the resulting
    * Observable's `subscribe` method.
    *
    * $unsafeBecauseImpure
    *
    * @param maxCapacity is the maximum buffer size after which old events
    *        start being dropped (according to what happens when using
    *        [[monix.reactive.subjects.ReplaySubject.createLimited[A](capacity:Int,initial* ReplaySubject.createLimited]])
    *
    * @return an Observable that, when first subscribed to, caches all of its
    *         items and notifications for the benefit of subsequent subscribers
    */
  @UnsafeBecauseImpure
  final def cache(maxCapacity: Int): Observable[A] =
    CachedObservable.create(self, maxCapacity)

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers). The underlying subject used is a
    * [[monix.reactive.subjects.BehaviorSubject BehaviorSubject]].
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def behavior[B >: A](initialValue: B)(implicit s: Scheduler): ConnectableObservable[B] =
    unsafeMulticast(BehaviorSubject[B](initialValue))

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers). The underlying subject used is a
    * [[monix.reactive.subjects.ReplaySubject ReplaySubject]].
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def replay(implicit s: Scheduler): ConnectableObservable[A] =
    unsafeMulticast(ReplaySubject[A]())

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers). The underlying subject used is a
    * [[monix.reactive.subjects.ReplaySubject ReplaySubject]].
    *
    * $unsafeBecauseImpure
    *
    * @param bufferSize is the size of the buffer limiting the number
    *        of items that can be replayed (on overflow the head
    *        starts being dropped)
    */
  @UnsafeBecauseImpure
  final def replay(bufferSize: Int)(implicit s: Scheduler): ConnectableObservable[A] =
    unsafeMulticast(ReplaySubject.createLimited[A](bufferSize))

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers).
    *
    * '''UNSAFE PROTOCOL''': This operator is unsafe because `Subject`
    * objects are stateful and have to obey the `Observer` contract,
    * meaning that they shouldn't be subscribed multiple times, so
    * they are error prone. Only use if you know what you're doing,
    * otherwise prefer the safe [[Observable!.multicast multicast]]
    * operator.
    *
    * $unsafeBecauseImpure
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  final def unsafeMulticast[B >: A, R](processor: Subject[B, R])(implicit s: Scheduler): ConnectableObservable[R] =
    ConnectableObservable.unsafeMulticast(this, processor)

  /** Converts this observable into a multicast observable, useful for
    * turning a cold observable into a hot one (i.e. whose source is
    * shared by all observers). The underlying subject used is a
    * [[monix.reactive.subjects.AsyncSubject AsyncSubject]].
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def publishLast(implicit s: Scheduler): ConnectableObservable[A] =
    unsafeMulticast(AsyncSubject[A]())

  /** Creates a new [[monix.execution.CancelableFuture CancelableFuture]]
    * that upon execution will signal the first generated element of the
    * source observable. Returns an `Option` because the source can be empty.
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def runAsyncGetFirst(implicit s: Scheduler, opts: Task.Options = defaultOptions): CancelableFuture[Option[A]] =
    firstOptionL.runToFutureOpt(s, opts)

  /** Creates a new [[monix.execution.CancelableFuture CancelableFuture]]
    * that upon execution will signal the last generated element of the
    * source observable. Returns an `Option` because the source can be empty.
    *
    * $unsafeBecauseImpure
    */
  @UnsafeBecauseImpure
  final def runAsyncGetLast(implicit s: Scheduler, opts: Task.Options = defaultOptions): CancelableFuture[Option[A]] =
    lastOptionL.runToFutureOpt(s, opts)

  /** Subscribes to the source `Observable` and foreach element emitted
    * by the source it executes the given callback.
    */
  @UnsafeBecauseImpure
  final def foreach(cb: A => Unit)(implicit s: Scheduler): CancelableFuture[Unit] = {
    val p = Promise[Unit]()
    val onFinish = Callback.fromPromise(p)
    val c = unsafeSubscribeFn(new ForeachSubscriber[A](cb, onFinish, s))
    CancelableFuture(p.future, c)
  }

  // -----------------------------------------------------------------------
  // Pure operations ...

  /** Transforms the source using the given operator. */
  final def liftByOperator[B](operator: Operator[A, B]): Observable[B] =
    new LiftByOperatorObservable(self, operator)

  /** On execution, consumes the source observable
    * with the given [[Consumer]], effectively transforming the
    * source observable into a [[monix.eval.Task Task]].
    */
  final def consumeWith[R](f: Consumer[A, R]): Task[R] =
    f(self)

  /** Polymorphic version [[consumeWith]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLift]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def consumeWithF[F[_], R](f: Consumer[A, R])(implicit F: TaskLift[F]): F[R] =
    f(self).to[F]

  /** Alias for [[prepend]]. */
  final def +:[B >: A](elem: B): Observable[B] =
    prepend(elem)

  /** Creates a new Observable that emits the given element and then it
    * also emits the events of the source (prepend operation).
    */
  final def prepend[B >: A](elem: B): Observable[B] =
    Observable.cons(elem, self)

  /** Alias for [[append]]. */
  final def :+[B >: A](elem: B): Observable[B] =
    append(elem)

  /** Creates a new Observable that emits the events of the source and
    * then it also emits the given element (appended to the stream).
    */
  final def append[B >: A](elem: B): Observable[B] =
    self.appendAll(Observable.now(elem))

  /** Given the source observable and another `Observable`, emits all of
    * the items from the first of these Observables to emit an item
    * and cancel the other.
    */
  final def ambWith[B >: A](other: Observable[B]): Observable[B] =
    Observable.firstStartedOf(self, other)

  /** Periodically gather items emitted by an observable into bundles
    * and emit these bundles rather than emitting the items one at a
    * time. This version of `buffer` is emitting items once the
    * internal buffer has reached the given count.
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * Usage:
    *
    * {{{
    *   // Emits [2, 3], [4, 5], [6]
    *   Observable.range(2, 7)
    *     .bufferTumbling(count = 2)
    * }}}
    *
    * @param count the maximum size of each buffer before it should
    *        be emitted
    */
  final def bufferTumbling(count: Int): Observable[Seq[A]] =
    bufferSliding(count, count)

  /** Returns an observable that emits buffers of items it collects from
    * the source observable. The resulting observable emits buffers
    * every `skip` items, each containing `count` items.
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * For `count` and `skip` there are 3 possibilities:
    *
    *  1. in case `skip == count`, then there are no items dropped and
    *     no overlap, the call being equivalent to `bufferTumbling(count)`
    *  1. in case `skip < count`, then overlap between buffers
    *     happens, with the number of elements being repeated being
    *     `count - skip`
    *  1. in case `skip > count`, then `skip - count` elements start
    *     getting dropped between windows
    *
    * Usage:
    *
    * {{{
    *   // Emits [2, 3], [5, 6]
    *   Observable.range(2, 7)
    *     .bufferSliding(count = 2, skip = 3)
    * }}}
    *
    * {{{
    *   // Emits [2, 3, 4], [4, 5, 6]
    *   Observable.range(2, 7)
    *     .bufferSliding(count = 3, skip = 2)
    * }}}
    *
    * @param count the maximum size of each buffer before it should
    *        be emitted
    * @param skip how many items emitted by the source observable should
    *        be skipped before starting a new buffer. Note that when
    *        skip and count are equal, this is the same operation as
    *        `bufferTumbling(count)`
    */
  final def bufferSliding(count: Int, skip: Int): Observable[Seq[A]] =
    liftByOperator(new BufferSlidingOperator(count, skip))

  /** Periodically gather items emitted by an observable into bundles
    * and emit these bundles rather than emitting the items one at a
    * time.
    *
    * This version of `buffer` emits a new bundle of items
    * periodically, every timespan amount of time, containing all
    * items emitted by the source Observable since the previous bundle
    * emission.
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * @param timespan the interval of time at which it should emit
    *        the buffered bundle
    */
  final def bufferTimed(timespan: FiniteDuration): Observable[Seq[A]] =
    bufferTimedAndCounted(timespan, 0)

  /** Periodically gather items emitted by an observable into bundles
    * and emit these bundles rather than emitting the items one at a
    * time.
    *
    * The resulting observable emits connected, non-overlapping
    * buffers, each of a fixed duration specified by the `timespan`
    * argument or a maximum size specified by the `maxCount` argument
    * (whichever is reached first).
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * @param timespan the interval of time at which it should emit
    *        the buffered bundle
    * @param maxCount is the maximum bundle size, after which the
    *        buffered bundle gets forcefully emitted
    */
  final def bufferTimedAndCounted(timespan: FiniteDuration, maxCount: Int): Observable[Seq[A]] =
    new BufferTimedObservable[A](self, timespan, maxCount)

  /** Periodically gather items emitted by an observable into bundles
    * and emit these bundles rather than emitting the items one at a
    * time. Back-pressure the source when the buffer is full.
    *
    * The resulting observable emits connected, non-overlapping
    * buffers, each of a fixed duration specified by the `period`
    * argument.
    *
    * The bundles are emitted at a fixed rate. If the source is
    * silent, then the resulting observable will start emitting empty
    * sequences.
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * A `maxSize` argument is specified as the capacity of the
    * bundle. In case the source is too fast and `maxSize` is reached,
    * then the source will be back-pressured.
    *
    * A `sizeOf` argument is specified as the weight each element
    * represents in the bundle. Defaults to count each element as
    * weighting 1.
    *
    * The difference with [[bufferTimedAndCounted]] is that
    * [[bufferTimedWithPressure]] applies back-pressure from the time
    * when the buffer is full until the buffer is emitted, whereas
    * [[bufferTimedAndCounted]] will forcefully emit the buffer when
    * it's full.
    *
    * @param period the interval of time at which it should emit
    *        the buffered bundle
    * @param maxSize is the maximum buffer size, after which the
    *        source starts being back-pressured
    * @param sizeOf is the function to compute the weight of each
    *        element in the buffer
    */
  final def bufferTimedWithPressure[AA >: A](
    period: FiniteDuration,
    maxSize: Int,
    sizeOf: AA => Int = (_: AA) => 1
  ): Observable[Seq[AA]] = {
    val sampler = Observable.intervalAtFixedRate(period, period)
    new BufferWithSelectorObservable(self, sampler, maxSize, sizeOf)
  }

  /** Buffers elements while predicate returns true,
    * after which it emits the buffered events as a single bundle
    * and creates a new buffer.
    *
    * Usage:
    *
    * {{{
    *   import monix.eval.Task
    *
    *   Observable(1, 1, 1, 2, 2, 1, 3)
    *     .bufferWhile(_ == 1)
    *     .doOnNext(l => Task(println(s"Emitted batch $$l")))
    *
    *   // Emitted batch List(1, 1, 1)
    *   // Emitted batch List(2)
    *   // Emitted batch List(2, 1)
    *   // Emitted batch List(3)
    * }}}
    *
    * @see [[bufferWhileInclusive]] for a similar operator that includes
    *      the value that caused `predicate` to return `false`
    */
  final def bufferWhile(p: A => Boolean): Observable[Seq[A]] =
    self.liftByOperator(new BufferWhileOperator(p, inclusive = false))

  /** Buffers elements while predicate returns true,
    * after which it emits the buffered events as a single bundle,
    * including the value that caused `predicate` to return `false`
    * and creates a new buffer.
    *
    * Usage:
    *
    * {{{
    *   import monix.eval.Task
    *
    *   Observable(1, 1, 1, 2, 2, 1, 3)
    *     .bufferWhileInclusive(_ == 1)
    *     .doOnNext(l => Task(println(s"Emitted batch $$l")))
    *
    *   // Emitted batch List(1, 1, 1, 2)
    *   // Emitted batch List(2)
    *   // Emitted batch List(1, 3)
    * }}}
    *
    * @see [[bufferWhile]] for a similar operator that does not include
    *      the value that caused `predicate` to return `false`
    */
  final def bufferWhileInclusive(p: A => Boolean): Observable[Seq[A]] =
    self.liftByOperator(new BufferWhileOperator(p, inclusive = true))

  /** $bufferWithSelectorDesc
    *
    * @param selector is the observable that triggers the
    *        signaling of the current buffer
    */
  final def bufferWithSelector[S](selector: Observable[S]): Observable[Seq[A]] =
    new BufferWithSelectorObservable[A, S](self, selector, 0, (_: A) => 1)

  /** $bufferWithSelectorDesc
    *
    * A `maxSize` argument is specified as the capacity of the
    * bundle. In case the source is too fast and `maxSize` is reached,
    * then the source will be back-pressured.
    *
    * @param selector is the observable that triggers the signaling of the
    *        current buffer
    * @param maxSize is the maximum bundle size, after which the
    *        source starts being back-pressured
    */
  final def bufferWithSelector[S](selector: Observable[S], maxSize: Int): Observable[Seq[A]] =
    new BufferWithSelectorObservable(self, selector, maxSize, (_: A) => 1)

  /** Buffers signals while busy, after which it emits the
    * buffered events as a single bundle.
    *
    * This operator starts applying back-pressure when the
    * underlying buffer's size is exceeded.
    *
    * Usage:
    *
    * {{{
    *   import monix.eval.Task
    *   import scala.concurrent.duration._
    *
    *   Observable.range(1, 6)
    *     .doOnNext(l => Task(println(s"Started $$l")))
    *     .bufferIntrospective(maxSize = 2)
    *     .doOnNext(l => Task(println(s"Emitted batch $$l")))
    *     .mapEval(l => Task(println(s"Processed batch $$l")).delayExecution(500.millis))
    *
    *   // Started 1
    *   // Emitted batch List(1)
    *   // Started 2
    *   // Started 3
    *   // Processed batch List(1)
    *   // Emitted batch List(2, 3)
    *   // Started 4
    *   // Started 5
    *   // Processed batch List(2, 3)
    *   // Emitted batch List(4, 5)
    *   // Processed batch List(4, 5)
    * }}}
    */
  final def bufferIntrospective(maxSize: Int): Observable[List[A]] =
    new BufferIntrospectiveObservable[A](self, maxSize)

  /** Implementation of `bracket` from `cats.effect.Bracket`.
    *
    * See [[https://typelevel.org/cats-effect/typeclasses/bracket.html documentation]].
    */
  final def bracket[B](use: A => Observable[B])(release: A => Task[Unit]): Observable[B] =
    bracketCase(use)((a, _) => release(a))

  /** Version of [[bracket]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So in `release` you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def bracketF[F[_], B](use: A => Observable[B])(release: A => F[Unit])(implicit F: TaskLike[F]): Observable[B] =
    bracket(use)(release.andThen(F.apply))

  /** Implementation of `bracketCase` from `cats.effect.Bracket`.
    *
    * See [[https://typelevel.org/cats-effect/typeclasses/bracket.html documentation]].
    */
  final def bracketCase[B](use: A => Observable[B])(release: (A, ExitCase[Throwable]) => Task[Unit]): Observable[B] =
    new ConcatMapObservable(uncancelable, use, release, delayErrors = false)

  /** Version of [[bracketCase]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So in `release` you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def bracketCaseF[F[_], B](use: A => Observable[B])(release: (A, ExitCase[Throwable]) => F[Unit])(implicit
    F: TaskLike[F]
  ): Observable[B] =
    bracketCase(use)((a, e) => F(release(a, e)))

  /** Applies the given partial function to the source
    * for each element for which the given partial function is defined.
    *
    * @param pf the function that filters and maps the source
    * @return an observable that emits the transformed items by the
    *         given partial function
    */
  final def collect[B](pf: PartialFunction[A, B]): Observable[B] =
    self.liftByOperator(new CollectOperator(pf))

  /** Takes longest prefix of elements that satisfy the given partial function
    * and returns a new Observable that emits those elements.
    *
    * @param pf the function that filters and maps the source
    * @return an observable that emits the transformed items by the
    *         given partial function until it is contained in the function's domain
    */
  final def collectWhile[B](pf: PartialFunction[A, B]): Observable[B] =
    self.liftByOperator(new CollectWhileOperator(pf))

  /** Creates a new observable from the source and another given
    * observable, by emitting elements combined in pairs.
    *
    * It emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
    * </pre>
    *
    * See [[zip]] for an alternative that pairs the items in strict sequence.
    *
    * @param other is an observable that gets paired with the source
    */
  final def combineLatest[B](other: Observable[B]): Observable[(A, B)] =
    new CombineLatest2Observable[A, B, (A, B)](self, other)((a, b) => (a, b))

  /** Creates a new observable from the source and another given
    * observable, by emitting elements combined in pairs.
    *
    * It emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
    * </pre>
    *
    * See [[zipMap]] for an alternative that pairs the items
    * in strict sequence.
    *
    * @param other is an observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def combineLatestMap[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
    new CombineLatest2Observable[A, B, R](self, other)(f)

  /** Ignores all items emitted by the source Observable and only calls
    * onCompleted or onError.
    *
    * @return an empty Observable that only calls onCompleted or onError,
    *         based on which one is called by the source Observable
    */
  final def completed: Observable[Nothing] =
    self.liftByOperator(CompletedOperator)

  /** Doesn't emit anything until a `timeout` period passes without the
    * source emitting anything. When that timeout happens, we
    * subscribe to the observable generated by the given function, an
    * observable that will keep emitting until the source will break
    * the silence by emitting another event.
    *
    * Note: If the source observable keeps emitting items more
    * frequently than the length of the time window, then no items
    * will be emitted by the resulting Observable.
    *
    * @param f is a function that receives the last element generated
    *        by the source, generating an observable to be subscribed
    *        when the source is timing out
    * @param timeout the length of the window of time that must pass after
    *        the emission of an item from the source Observable in
    *        which that Observable emits no items in order for the
    *        item to be emitted by the resulting Observable
    */
  final def debounceTo[B](timeout: FiniteDuration, f: A => Observable[B]): Observable[B] =
    self.switchMap(a => f(a).delayExecution(timeout))

  /** Hold an Observer's subscription request for a specified amount of
    * time before passing it on to the source Observable.
    *
    * @param timespan is the time to wait before the subscription
    *        is being initiated.
    */
  final def delayExecution(timespan: FiniteDuration): Observable[A] =
    new DelayExecutionByTimespanObservable(self, timespan)

  /** Convert an observable that emits observables into a single
    * observable that emits the items emitted by the
    * most-recently-emitted of those observables.
    *
    * Similar with [[concatMap]], however the source isn't
    * back-pressured when emitting new events. Instead new events
    * being emitted are cancelling the active child observables.
    *
    * ==Example==
    *
    * The `switchMap` can express a lot of cool, time-based operations.
    * For example we can express [[debounce]] in terms of `switchMap`:
    * {{{
    *   import scala.concurrent.duration._
    *
    *   def debounce[A](stream: Observable[A], d: FiniteDuration): Observable[A] =
    *     stream.switchMap { x =>
    *       Observable.now(x).delayExecution(d)
    *     }
    * }}}
    *
    * @param f is a generator for the streams that are being merged
    */
  final def switchMap[B](f: A => Observable[B]): Observable[B] =
    new SwitchMapObservable[A, B](self, f)

  /** Emits the last item from the source Observable if a particular
    * timespan has passed without it emitting another item, and keeps
    * emitting that item at regular intervals until the source breaks
    * the silence.
    *
    * So compared to regular [[debounceTo]] this version
    * keeps emitting the last item of the source.
    *
    * Note: If the source Observable keeps emitting items more
    * frequently than the length of the time window then no items will
    * be emitted by the resulting Observable.
    *
    * @param period the length of the window of time that must pass after
    *        the emission of an item from the source Observable in
    *        which that Observable emits no items in order for the
    *        item to be emitted by the resulting Observable at regular
    *        intervals, also determined by period
    * @see [[echoRepeated]] for a similar operator that also mirrors
    *     the source observable
    */
  final def debounceRepeated(period: FiniteDuration): Observable[A] =
    new DebounceObservable(self, period, repeat = true)

  /** Emit items from the source, or emit a default item if
    * the source completes after emitting no items.
    */
  final def defaultIfEmpty[B >: A](default: => B): Observable[B] =
    self.liftByOperator(new DefaultIfEmptyOperator[B](() => default))

  /** Delays emitting the final `onComplete` event by the specified amount. */
  final def delayOnComplete(delay: FiniteDuration): Observable[A] =
    new DelayOnCompleteObservable(self, delay)

  /** Returns an Observable that emits the items emitted by the source
    * Observable shifted forward in time by a specified delay.
    *
    * Each time the source Observable emits an item, delay starts a
    * timer, and when that timer reaches the given duration, the
    * Observable returned from delay emits the same item.
    *
    * NOTE: this delay refers strictly to the time between the
    * `onNext` event coming from our source and the time it takes the
    * downstream observer to get this event. On the other hand the
    * operator is also applying back-pressure, so on slow observers
    * the actual time passing between two successive events may be
    * higher than the specified `duration`.
    *
    * @param duration - the delay to shift the source by
    * @return the source Observable shifted in time by the specified delay
    */
  final def delayOnNext(duration: FiniteDuration): Observable[A] =
    new DelayByTimespanObservable[A](self, duration)

  /** Returns an Observable that emits the items emitted by the source
    * Observable shifted forward in time.
    *
    * This variant of `delay` sets its delay duration on a per-item
    * basis by passing each item from the source Observable into a
    * function that returns an Observable and then monitoring those
    * Observables. When any such Observable emits an item or
    * completes, the Observable returned by delay emits the associated
    * item.
    *
    * @param selector is a function that returns an Observable for
    *        each item emitted by the source Observable, which is then
    *        used to delay the emission of that item by the resulting
    *        Observable until the Observable returned from `selector`
    *        emits an item
    * @return the source Observable shifted in time by
    *         the specified delay
    */
  final def delayOnNextBySelector[B](selector: A => Observable[B]): Observable[A] =
    new DelayBySelectorObservable[A, B](self, selector)

  /** Hold an Observer's subscription request until the given `trigger`
    * observable either emits an item or completes, before passing it
    * on to the source Observable.
    *
    * If the given `trigger` completes in error, then the subscription is
    * terminated with `onError`.
    *
    * @param trigger the observable that must either emit an item or
    *        complete in order for the source to be subscribed.
    */
  final def delayExecutionWith[B](trigger: Observable[B]): Observable[A] =
    new DelayExecutionWithTriggerObservable(self, trigger)

  /** Version of [[delayExecutionWith]] that can work with generic `F[_]`
    * tasks, anything that's supported via [[ObservableLike]] conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def delayExecutionWithF[F[_], B](trigger: F[B])(implicit F: ObservableLike[F]): Observable[A] =
    delayExecutionWith(F.apply(trigger))

  /** Converts the source Observable that emits `Notification[A]` (the
    * result of [[materialize]]) back to an Observable that emits `A`.
    */
  final def dematerialize[B](implicit ev: A <:< Notification[B]): Observable[B] =
    self.map(ev).liftByOperator(new DematerializeOperator[B])

  /** Suppress duplicate consecutive items emitted by the source.
    *
    * Example:
    * {{{
    *   // Needed to bring standard Eq instances in scope:
    *   import cats.implicits._
    *
    *   // Yields 1, 2, 1, 3, 2, 4
    *   val stream = Observable(1, 1, 1, 2, 2, 1, 1, 3, 3, 3, 2, 2, 4, 4, 4)
    *     .distinctUntilChanged
    * }}}
    *
    * Duplication is detected by using the equality relationship
    * provided by the [[cats.Eq]] type class. This allows one to
    * override the equality operation being used (e.g. maybe the
    * default `.equals` is badly defined, or maybe you want reference
    * equality, so depending on use case).
    *
    * $catsEqInterop
    *
    * @param A is the [[cats.Eq]] instance that defines equality
    *        for the elements emitted by the source
    */
  final def distinctUntilChanged[AA >: A](implicit A: Eq[AA]): Observable[AA] =
    self.liftByOperator(new DistinctUntilChangedOperator()(A))

  /** Given a function that returns a key for each element emitted by
    * the source, suppress consecutive duplicate items.
    *
    * Example:
    * {{{
    *   // Needed to bring standard instances in scope:
    *   import cats.implicits._
    *
    *   // Yields 1, 2, 3, 4
    *   val stream = Observable(1, 3, 2, 4, 2, 3, 5, 7, 4)
    *     .distinctUntilChangedByKey(_ % 2)
    * }}}
    *
    * Duplication is detected by using the equality relationship
    * provided by the [[cats.Eq]] type class. This allows one to
    * override the equality operation being used (e.g. maybe the
    * default `.equals` is badly defined, or maybe you want reference
    * equality, so depending on use case).
    *
    * $catsEqInterop
    *
    * @param key is a function that returns a `K` key for each element,
    *        a value that's then used to do the deduplication
    *
    * @param K is the [[cats.Eq]] instance that defines equality for
    *        the key type `K`
    */
  final def distinctUntilChangedByKey[K](key: A => K)(implicit K: Eq[K]): Observable[A] =
    self.liftByOperator(new DistinctUntilChangedByKeyOperator(key)(K))

  /** Executes the given task when the streaming is stopped
    * due to a downstream [[monix.execution.Ack.Stop Stop]] signal
    * returned by [[monix.reactive.Observer.onNext onNext]].
    *
    * The given `task` gets evaluated *before* the upstream
    * receives the `Stop` event (is back-pressured).
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *
    *   val stream = Observable.range(0, Int.MaxValue)
    *     .doOnEarlyStop(Task(println("Stopped early!")))
    *     .take(100)
    * }}}
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    */
  final def doOnEarlyStop(task: Task[Unit]): Observable[A] =
    self.liftByOperator(new DoOnEarlyStopOperator[A](task))

  /** Version of [[doOnEarlyStop]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * Example:
    * {{{
    *   import cats.effect.IO
    *
    *   val stream = Observable.range(0, Int.MaxValue)
    *     .doOnEarlyStopF(IO(println("Stopped early!")))
    *     .take(100)
    * }}}
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    */
  final def doOnEarlyStopF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnEarlyStop(F(task))

  /** Executes the given callback when the connection is being cancelled,
    * via the [[monix.execution.Cancelable Cancelable]] reference returned
    * on subscribing to the created observable.
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *   import monix.execution.Scheduler
    *
    *   implicit val s = Scheduler.global
    *
    *   val cancelable =
    *     Observable
    *       .range(0, Int.MaxValue)
    *       .doOnSubscriptionCancel(Task(println("Cancelled!")))
    *       .subscribe()
    *
    *   cancelable.cancel()
    * }}}
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    */
  final def doOnSubscriptionCancel(task: Task[Unit]): Observable[A] =
    new DoOnSubscriptionCancelObservable[A](self, task)

  /** Version of [[doOnSubscriptionCancel]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * Example:
    * {{{
    *   import cats.effect.IO
    *   import monix.execution.Scheduler
    *
    *   implicit val s = Scheduler.global
    *
    *   val cancelable =
    *     Observable
    *       .range(0, Int.MaxValue)
    *       .doOnSubscriptionCancelF(IO(println("Cancelled!")))
    *       .subscribe()
    *
    *   cancelable.cancel()
    * }}}
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    */
  final def doOnSubscriptionCancelF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnSubscriptionCancel(F(task))

  /** Evaluates the given task when the stream has ended with an
    * `onComplete` event, but before the complete event is emitted.
    *
    * The task gets evaluated and is finished *before* the `onComplete`
    * signal gets sent downstream.
    *
    * {{{
    *   import monix.eval.Task
    *
    *   Observable.range(0, 10)
    *     .doOnComplete(Task(println("Completed!")))
    * }}}
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    *
    * @param task the task to execute when the `onComplete`
    *        event gets emitted
    */
  final def doOnComplete(task: Task[Unit]): Observable[A] =
    self.liftByOperator(new DoOnCompleteOperator[A](task))

  /** Version of [[doOnComplete]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * {{{
    *   import cats.effect.IO
    *
    *   Observable.range(0, 10)
    *     .doOnCompleteF(IO(println("Completed!")))
    * }}}
    */
  final def doOnCompleteF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnComplete(F(task))

  /** Executes the given task when the stream is interrupted with an
    * error, before the `onError` event is emitted downstream.
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *
    *   val dummy = new RuntimeException("dummy")
    *
    *   (Observable.range(0, 10) ++ Observable.raiseError(dummy))
    *     .doOnError { e =>
    *       Task(println(s"Triggered error: $$e"))
    *     }
    * }}}
    *
    * NOTE: should protect the code in this callback, because if it
    * throws an exception the `onError` event will prefer signaling
    * the original exception and otherwise the behavior is undefined.
    *
    * NOTE: in most cases what you want is [[guaranteeCase]]
    * or [[bracketCase]]. This operator is available for
    * fine-grained control.
    */
  final def doOnError(cb: Throwable => Task[Unit]): Observable[A] =
    self.liftByOperator(new DoOnErrorOperator[A](cb))

  /** Version of [[doOnError]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * {{{
    *   import cats.effect.IO
    *
    *   val dummy = new RuntimeException("dummy")
    *
    *   (Observable.range(0, 10) ++ Observable.raiseError(dummy))
    *     .doOnErrorF { e =>
    *       IO(println(s"Triggered error: $$e"))
    *     }
    * }}}
    */
  final def doOnErrorF[F[_]](cb: Throwable => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnError(e => F(cb(e)))

  /** Evaluates the given callback for each element generated by the
    * source Observable, useful for triggering async side-effects.
    *
    * @return a new Observable that executes the specified
    *         callback for each element
    *
    * @see [[doOnNext]] for a simpler version that doesn't allow
    *     asynchronous execution.
    */
  final def doOnNext(cb: A => Task[Unit]): Observable[A] =
    self.mapEval(a => cb(a).map(_ => a))

  /** Version of [[doOnNext]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * @return a new Observable that executes the specified
    *         callback for each element
    */
  final def doOnNextF[F[_]](cb: A => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    self.doOnNext(a => F(cb(a)))

  /** Executes the given callback on each acknowledgement received from
    * the downstream subscriber, executing a generated
    * [[monix.eval.Task Task]] and back-pressuring until the task
    * is done.
    *
    * This method helps in executing logic after messages get
    * processed, for example when messages are polled from
    * some distributed message queue and an acknowledgement
    * needs to be sent after each message in order to mark it
    * as processed.
    *
    * @see [[doOnNextAckF]] for a version that can do evaluation with
    *      any data type via [[monix.eval.TaskLike]]
    */
  final def doOnNextAck(cb: (A, Ack) => Task[Unit]): Observable[A] =
    self.liftByOperator(new DoOnNextAckOperator[A](cb))

  /** Version of [[doOnNextAck]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def doOnNextAckF[F[_]](cb: (A, Ack) => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnNextAck((a, ack) => Task.from(cb(a, ack))(F))

  /** Executes the given callback only for the first element generated
    * by the source Observable, useful for doing a piece of
    * computation only when the stream starts.
    *
    * For example this observable will have a "delayed execution"
    * of 1 second, plus a delayed first element of another 1 second,
    * therefore it will take a total of 2 seconds for the first
    * element to be emitted:
    *
    * {{{
    *   import monix.eval._
    *   import scala.concurrent.duration._
    *
    *   Observable.range(0, 100)
    *     .delayExecution(1.second)
    *     .doOnStart { a =>
    *       for {
    *         _ <- Task.sleep(1.second)
    *         _ <- Task(println(s"Started with: $$a"))
    *       } yield ()
    *     }
    * }}}
    *
    * @return a new Observable that executes the specified task
    *         only for the first element
    */
  final def doOnStart(cb: A => Task[Unit]): Observable[A] =
    self.liftByOperator(new DoOnStartOperator[A](cb))

  /** Version of [[doOnStart]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * {{{
    *   import cats.implicits._
    *   import cats.effect._
    *   import cats.effect.Timer
    *   import scala.concurrent.duration._
    *   import monix.execution.Scheduler.Implicits.global
    *   import monix.catnap.SchedulerEffect
    *   // Needed for IO.sleep
    *  implicit val timer: Timer[IO] = SchedulerEffect.timerLiftIO[IO](global)
    *
    *   Observable.range(0, 100)
    *     .delayExecution(1.second)
    *     .doOnStartF { a =>
    *       for {
    *         _ <- IO.sleep(1.second)
    *         _ <- IO(println(s"Started with: $$a"))
    *       } yield ()
    *     }
    * }}}
    */
  final def doOnStartF[F[_]](cb: A => F[Unit])(implicit F: Effect[F]): Observable[A] =
    doOnStart(a => Task.fromEffect(cb(a))(F))

  /** Executes the given callback just _before_ the subscription
    * to the source happens.
    *
    * For example this is equivalent with [[delayExecution]]:
    *
    * {{{
    *   import monix.eval.Task
    *   import scala.concurrent.duration._
    *
    *   Observable.range(0, 10)
    *     .doOnSubscribe(Task.sleep(1.second))
    * }}}
    *
    * @see [[doAfterSubscribe]] for executing a callback just after
    *     a subscription happens.
    */
  final def doOnSubscribe(task: Task[Unit]): Observable[A] =
    new DoOnSubscribeObservable.Before[A](self, task)

  /** Version of [[doOnSubscribe]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * For example this is equivalent with [[delayExecution]]:
    *
    * {{{
    *   import cats.effect._
    *   import cats.effect.Timer
    *   import scala.concurrent.duration._
    *   import monix.execution.Scheduler.Implicits.global
    *   import monix.catnap.SchedulerEffect
    *   // Needed for IO.sleep
    *  implicit val timer: Timer[IO] = SchedulerEffect.timerLiftIO[IO](global)
    *
    *   Observable.range(0, 10)
    *     .doOnSubscribeF(IO.sleep(1.second))
    * }}}
    */
  final def doOnSubscribeF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doOnSubscribe(F(task))

  /** Executes the given callback just _after_ the subscription
    * happens.
    *
    * The executed `Task` executes after the subscription happens
    * and it will delay the first event being emitted. For example
    * this would delay the emitting of the first event by 1 second:
    *
    * {{{
    *   import monix.eval.Task
    *   import scala.concurrent.duration._
    *
    *   Observable.range(0, 100)
    *     .doAfterSubscribe(Task.sleep(1.second))
    * }}}
    *
    * @see [[doOnSubscribe]] for executing a callback just before
    *     a subscription happens.
    */
  final def doAfterSubscribe(task: Task[Unit]): Observable[A] =
    new DoOnSubscribeObservable.After[A](self, task)

  /** Version of [[doAfterSubscribe]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * {{{
    *   import cats.effect._
    *   import cats.effect.Timer
    *   import scala.concurrent.duration._
    *   import monix.execution.Scheduler.Implicits.global
    *   import monix.catnap.SchedulerEffect
    *   // Needed for IO.sleep
    *   implicit val timer: Timer[IO] = SchedulerEffect.timerLiftIO[IO](global)
    *
    *   Observable.range(0, 100)
    *     .doAfterSubscribeF(IO.sleep(1.second))
    * }}}
    */
  final def doAfterSubscribeF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    doAfterSubscribe(F(task))

  /** Creates a new observable that drops the events of the source, only
    * for the specified `timestamp` window.
    *
    * @param timespan the window of time during which the new observable
    *        must drop events emitted by the source
    */
  final def dropByTimespan(timespan: FiniteDuration): Observable[A] =
    new DropByTimespanObservable(self, timespan)

  /** Drops the last `n` elements (from the end).
    *
    * @param n the number of elements to drop
    * @return a new Observable that drops the first ''n'' elements
    *         emitted by the source
    */
  final def dropLast(n: Int): Observable[A] =
    self.liftByOperator(new DropLastOperator[A](n))

  /** Discard items emitted by the source until a second
    * observable emits an item or completes.
    *
    * If the `trigger` observable completes in error, then the
    * resulting observable will also end in error when it notices
    * it (next time an element is emitted by the source).
    *
    * @param trigger the observable that has to emit an item before the
    *        source begin to be mirrored by the resulting observable
    */
  final def dropUntil(trigger: Observable[Any]): Observable[A] =
    new DropUntilObservable(self, trigger)

  /** Drops the longest prefix of elements that satisfy the given
    * predicate and returns a new observable that emits the rest.
    */
  final def dropWhile(p: A => Boolean): Observable[A] =
    self.liftByOperator(new DropByPredicateOperator(p, inclusive = false))

  /** Drops the longest prefix of elements that satisfy the given
    * predicate, inclusive of the value that caused `predicate` to return `false` and
    * returns a new observable that emits the rest.
    */
  final def dropWhileInclusive(p: A => Boolean): Observable[A] =
    self.liftByOperator(new DropByPredicateOperator(p, inclusive = true))

  /** Drops the longest prefix of elements that satisfy the given
    * function and returns a new observable that emits the rest. In
    * comparison with [[dropWhile]], this version accepts a function
    * that takes an additional parameter: the zero-based index of the
    * element.
    */
  final def dropWhileWithIndex(p: (A, Int) => Boolean): Observable[A] =
    self.liftByOperator(new DropByPredicateWithIndexOperator(p))

  /** Utility that can be used for debugging purposes.
    */
  final def dump(prefix: String, out: PrintStream = System.out): Observable[A] =
    new DumpObservable[A](self, prefix, out)

  /** Mirror the source observable as long as the source keeps emitting
    * items, otherwise if `timeout` passes without the source emitting
    * anything new then the observable will emit the last item.
    *
    * Note: If the source Observable keeps emitting items more
    * frequently than the length of the time window then the resulting
    * observable will mirror the source exactly.
    *
    * @param timeout the window of silence that must pass in order for the
    *        observable to echo the last item
    */
  final def echoOnce(timeout: FiniteDuration): Observable[A] =
    new EchoObservable(self, timeout, onlyOnce = true)

  /** Mirror the source observable as long as the source keeps emitting
    * items, otherwise if `timeout` passes without the source emitting
    * anything new then the observable will start emitting the last
    * item repeatedly.
    *
    * Note: If the source Observable keeps emitting items more
    * frequently than the length of the time window then the resulting
    * observable will mirror the source exactly.
    *
    * @param timeout the window of silence that must pass in order for the
    *        observable to start echoing the last item
    */
  final def echoRepeated(timeout: FiniteDuration): Observable[A] =
    new EchoObservable(self, timeout, onlyOnce = false)

  /** Creates a new Observable that emits the events of the source and
    * then it also emits the given elements (appended to the stream).
    */
  final def endWith[B >: A](elems: Seq[B]): Observable[B] =
    self.appendAll(Observable.fromIterable(elems))

  /** Concatenates the source with another observable.
    *
    * Ordering of subscription is preserved, so the second observable
    * starts only after the source observable is completed
    * successfully with an `onComplete`. On the other hand, the second
    * observable is never subscribed if the source completes with an
    * error.
    *
    * == Visual Example ==
    *
    * <pre>
    * streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
    * streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
    *
    * result: a1, a2, a3, a4, b1, b2, b3, b4
    * </pre>
    */
  final def ++[B >: A](other: => Observable[B]): Observable[B] =
    appendAll(Observable.defer(other))

  /** A strict variant of [[++]].
    */
  final def appendAll[B >: A](other: Observable[B]): Observable[B] =
    new ConcatObservable[B](self, other)

  /** Emits the given exception instead of `onComplete`.
    *
    * @param error the exception to emit onComplete
    * @return a new Observable that emits an exception onComplete
    */
  final def endWithError(error: Throwable): Observable[A] =
    self.liftByOperator(new EndWithErrorOperator[A](error))

  /** Returns an observable that emits a single Throwable, in case an
    * error was thrown by the source, otherwise it isn't going to emit
    * anything.
    */
  final def failed: Observable[Throwable] =
    self.liftByOperator(FailedOperator)

  /** Alias for [[headOrElse]]. */
  final def firstOrElse[B >: A](default: => B): Observable[B] =
    headOrElse(default)

  /** Emits the first element emitted by the source, or otherwise if the
    * source is completed without emitting anything, then the
    * `default` is emitted.
    */
  final def headOrElse[B >: A](default: => B): Observable[B] =
    head.foldLeft(Option.empty[B])((_, elem) => Some(elem)).map {
      case Some(elem) => elem
      case None => default
    }

  /** Returns a new observable that applies the given function
    * to each item emitted by the source and emits the result.
    */
  final def map[B](f: A => B): Observable[B] =
    self.liftByOperator(new MapOperator(f))

  /** Applies a function that you supply to each item emitted by the
    * source observable, where that function returns a sequence of elements, and
    * then concatenating those resulting sequences and emitting the
    * results of this concatenation.
    *
    * ==Example==
    * {{{
    *   Observable(1, 2, 3).concatMapIterable( x => List(x, x * 10, x * 100))
    * }}}
    *
    * == Visual Example ==
    *
    * <pre>
    * stream: 1 -- -- 2 -- -- 3 -- --
    * result: 1, 10, 100, 2, 20, 200, 3, 30, 300
    * </pre>
    *
    * @param f is a generator for the sequences being concatenated
    */
  final def concatMapIterable[B](f: A => immutable.Iterable[B]): Observable[B] =
    self.liftByOperator(new ConcatMapIterableOperator(f))

  /** Alias for [[concatMapIterable]]
    *
    * NOTE: one primary difference between Monix and other Rx /
    * ReactiveX implementations is that in Monix `flatMap` is an alias
    * for `concatMap` and NOT `mergeMap`.
    */
  final def flatMapIterable[B](f: A => immutable.Iterable[B]): Observable[B] =
    self.concatMapIterable(f)

  /** Alias for [[concatMap]].
    *
    * NOTE: one primary difference between Monix and other Rx /
    * ReactiveX implementations is that in Monix `flatMap` is an alias
    * for `concatMap` and NOT `mergeMap`.
    */
  final def flatMap[B](f: A => Observable[B]): Observable[B] =
    self.concatMap(f)

  /** Applies a function that you supply to each item emitted by the
    * source observable, where that function returns observables, and
    * then concatenating those resulting sequences and emitting the
    * results of this concatenation.
    *
    * This implements the lawful "monadic bind", the `flatMap`
    * operation of [[cats.Monad]].
    *
    * ==Example==
    * {{{
    *   Observable(1, 2, 3).concatMap { x =>
    *     for {
    *       _ <- Observable.eval(println(s"Processing $$x"))
    *       x <- Observable(x, x)
    *     } yield x
    *   }
    * }}}
    *
    * $concatMergeDifference
    *
    * @param f is a generator for the streams being concatenated
    * @return $concatReturn
    */
  final def concatMap[B](f: A => Observable[B]): Observable[B] =
    new ConcatMapObservable[A, B](self, f, null, delayErrors = false)

  /** Alias of [[concatMapDelayErrors]]. */
  final def flatMapDelayErrors[B](f: A => Observable[B]): Observable[B] =
    concatMapDelayErrors(f)

  /** Alias of [[switchMap]]. */
  final def flatMapLatest[B](f: A => Observable[B]): Observable[B] =
    self.switchMap(f)

  /** Applies a binary operator to a start value and to elements
    * produced by the source observable, going from left to right,
    * producing and concatenating observables along the way.
    *
    * It's the combination between [[scan]] and [[flatMap]].
    *
    * @see [[flatScan0]] for the version that emits seed element at the beginning
    */
  final def flatScan[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
    new FlatScanObservable[A, R](self, () => seed, op, delayErrors = false)

  /** Applies a binary operator to a start value and to elements
    * produced by the source observable, going from left to right,
    * producing and concatenating observables along the way.
    *
    * It's the combination between [[scan0]] and [[flatMap]].
    */
  final def flatScan0[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
    Observable.eval(seed).flatMap(s => s +: flatScan(s)(op))

  /** Version of [[flatScan]] that delays the errors from the emitted
    * streams until the source completes.
    *
    * $delayErrorsDescription
    *
    * @see [[flatScan]]
    */
  final def flatScanDelayErrors[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
    new FlatScanObservable[A, R](self, () => seed, op, delayErrors = true)

  /** Version of [[flatScan0]] that delays the errors from the emitted
    * streams until the source completes.
    *
    * $delayErrorsDescription
    *
    * @see [[flatScan0]]
    */
  final def flatScan0DelayErrors[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
    Observable.eval(seed).flatMap(s => s +: flatScanDelayErrors(s)(op))

  /** $concatDescription
    *
    * Alias for [[Observable!.concat concat]].
    *
    * @return $concatReturn
    */
  final def flatten[B](implicit ev: A <:< Observable[B]): Observable[B] =
    concat

  /** $concatDescription
    *
    * ==Equivalence with concatMap==
    *
    * The `concat` operation is basically `concatMap` with the
    * identity function, as you can count on this equivalence:
    *
    * `stream.concat <-> stream.concatMap(x => x)`
    *
    * == Visual Example ==
    *
    * <pre>
    * streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
    * streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
    *
    * result: a1, a2, a3, a4, b1, b2, b3, b4
    * </pre>
    * @return $concatReturn
    */
  final def concat[B](implicit ev: A <:< Observable[B]): Observable[B] =
    concatMap[B](x => x)

  /** Alias for [[concatDelayErrors]]. */
  final def flattenDelayErrors[B](implicit ev: A <:< Observable[B]): Observable[B] =
    concatDelayErrors

  /** Version of [[Observable!.concat concat]] that delays errors emitted by child
    * observables until the stream completes.
    *
    * $delayErrorsDescription
    *
    * ==Example==
    *
    * {{{
    *   val dummy1 = new RuntimeException("dummy1")
    *   val dummy2 = new RuntimeException("dummy2")
    *
    *   val stream = Observable(
    *     Observable(1).endWithError(dummy1),
    *     Observable.raiseError(dummy2),
    *     Observable(2, 3)
    *   )
    *
    *   val concatenated =
    *     stream.concatDelayErrors
    * }}}
    *
    * The resulting stream in this example emits `1, 2, 3` in order
    * and then completes with a `CompositeException` of both `dummy1`
    * and `dummy2`.
    *
    * @return $concatReturn
    */
  final def concatDelayErrors[B](implicit ev: A <:< Observable[B]): Observable[B] =
    concatMapDelayErrors(x => x)

  /** Applies a function that you supply to each item emitted by the
    * source observable, where that function returns sequences
    * and then concatenating those resulting sequences and emitting the
    * results of this concatenation.
    *
    * $delayErrorsDescription
    *
    * ==Example==
    *
    * {{{
    *   val dummy1 = new RuntimeException("dummy1")
    *   val dummy2 = new RuntimeException("dummy2")
    *
    *   Observable(1, 2, 3).concatMapDelayErrors {
    *     case 1 => Observable(1).endWithError(dummy1)
    *     case 2 => Observable.raiseError(dummy2)
    *     case x => Observable(x, x)
    *   }
    * }}}
    *
    * The resulting stream in this example emits `1, 3, 3` in order
    * and then completes with a `CompositeException` of both `dummy1`
    * and `dummy2`.
    *
    * @param f is a generator for the streams being concatenated
    * @return $concatReturn
    */
  final def concatMapDelayErrors[B](f: A => Observable[B]): Observable[B] =
    new ConcatMapObservable[A, B](self, f, null, delayErrors = true)

  /** Alias for [[switch]]. */
  final def flattenLatest[B](implicit ev: A <:< Observable[B]): Observable[B] =
    self.switch

  /** Convert an observable that emits observables into a single
    * observable that emits the items emitted by the
    * most-recently-emitted of those observables.
    *
    * Similar with [[flatten]], however the source isn't
    * back-pressured when emitting new events. Instead new events
    * being emitted are cancelling the active child observables.
    *
    * ==Equivalence with switchMap==
    *
    * The `switch` operation can be expressed in terms of [[switchMap]],
    * as we have this equivalence:
    *
    * `stream.switch <-> stream.switchMap(x => x)`
    *
    * @see the description of [[switchMap]] for an example.
    */
  final def switch[B](implicit ev: A <:< Observable[B]): Observable[B] =
    self.switchMap(x => x)

  /** Returns an Observable that emits a single boolean, either true, in
    * case the given predicate holds for all the items emitted by the
    * source, or false in case at least one item is not verifying the
    * given predicate.
    *
    * @param p is a function that evaluates the items emitted by the source
    *        Observable, returning `true` if they pass the filter
    * @return an Observable that emits only true or false in case the given
    *         predicate holds or not for all the items
    */
  final def forall(p: A => Boolean): Observable[Boolean] =
    exists(e => !p(e)).map(r => !r)

  /** Returns an Observable which emits a single value, either true, in
    * case the given predicate holds for at least one item, or false
    * otherwise.
    *
    * @param p is a function that evaluates the items emitted by the
    *        source Observable, returning `true` if they pass the
    *        filter
    * @return an Observable that emits only true or false in case
    *         the given predicate holds or not for at least one item
    */
  final def exists(p: A => Boolean): Observable[Boolean] =
    find(p).foldLeft(false)((_, _) => true)

  /** Groups the items emitted by an Observable according to a specified
    * criterion, and emits these grouped items as GroupedObservables,
    * one GroupedObservable per group.
    *
    * Note: A [[monix.reactive.observables.GroupedObservable GroupedObservable]]
    * will cache the items it is to emit until such time as it is
    * subscribed to. For this reason, in order to avoid memory leaks,
    * you should not simply ignore those GroupedObservables that do
    * not concern you. Instead, you can signal to them that they may
    * discard their buffers by doing something like `source.take(0)`.
    *
    * @param keySelector  a function that extracts the key for each item
    */
  final def groupBy[K](keySelector: A => K)(implicit
    os: Synchronous[Nothing] = OverflowStrategy.Unbounded
  ): Observable[GroupedObservable[K, A]] =
    self.liftByOperator(new GroupByOperator[A, K](os, keySelector))

  /** Given a routine make sure to execute it whenever the current
    * stream reaches the end, successfully, in error, or canceled.
    *
    * Implements `cats.effect.Bracket.guarantee`.
    *
    * Example: {{{
    *   import monix.eval.Task
    *
    *   Observable.suspend(???).guarantee(Task.eval {
    *     println("Releasing resources!")
    *   })
    * }}}
    *
    * @param f is the function to execute on early stop
    */
  final def guarantee(f: Task[Unit]): Observable[A] =
    guaranteeCase(_ => f)

  /** Version of [[guarantee]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def guaranteeF[F[_]](f: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    guarantee(F(f))

  /** Returns a new `Observable` in which `f` is scheduled to be executed
    * when the source is completed, in success, error or when cancelled.
    *
    * Implements `cats.effect.Bracket.guaranteeCase`.
    *
    * This would typically be used to ensure that a finalizer
    * will run at the end of the stream.
    *
    * Example: {{{
    *   import cats.effect.ExitCase
    *   import monix.eval.Task
    *
    *   val stream = Observable.suspend(???).guaranteeCase(err => Task {
    *     err match {
    *       case ExitCase.Completed =>
    *         println("Completed successfully!")
    *       case ExitCase.Error(e) =>
    *         e.printStackTrace()
    *       case ExitCase.Canceled =>
    *         println("Was stopped early!")
    *     }
    *   })
    * }}}
    *
    * NOTE this is using `cats.effect.ExitCase` to signal the termination
    * condition, like this:
    *
    *   - if completed via `onComplete` or via `Stop` signalled by the
    *     consumer, then the function receives `ExitCase.Completed`
    *   - if completed via `onError` or in certain cases in which errors
    *     are detected (e.g. the consumer returns an error), then the function
    *     receives `ExitCase.Error(e)`
    *   - if the subscription was cancelled, then the function receives
    *     `ExitCase.Canceled`
    *
    * In other words `Completed` is for normal termination conditions,
    * `Error` is for exceptions being detected and `Canceled` is for
    * when the subscription gets canceled.
    *
    * @param f is the finalizer to execute when streaming is terminated, by
    *        successful completion, error or cancellation; for specifying the
    *        side effects to use
    */
  final def guaranteeCase(f: ExitCase[Throwable] => Task[Unit]): Observable[A] =
    new GuaranteeCaseObservable[A](this, f)

  /** Version of [[guaranteeCase]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def guaranteeCaseF[F[_]](f: ExitCase[Throwable] => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    guaranteeCase(e => F(f(e)))

  /** Alias for [[completed]]. Ignores all items emitted by
    * the source and only calls onCompleted or onError.
    *
    * @return an empty sequence that only calls onCompleted or onError,
    *         based on which one is called by the source Observable
    */
  final def ignoreElements: Observable[Nothing] =
    self.liftByOperator(CompletedOperator)

  /** Creates a new observable from this observable and another given
    * observable by interleaving their items into a strictly
    * alternating sequence.
    *
    * So the first item emitted by the new observable will be the item
    * emitted by `self`, the second item will be emitted by the other
    * observable, and so forth; when either `self` or `other` calls
    * `onCompletes`, the items will then be directly coming from the
    * observable that has not completed; when `onError` is called by
    * either `self` or `other`, the new observable will call `onError`
    * and halt.
    *
    * See [[Observable!.merge merge]] for a more relaxed alternative that doesn't emit
    * items in strict alternating sequence.
    *
    * @param other is an observable that interleaves with the source
    * @return a new observable sequence that alternates emission of
    *         the items from both child streams
    */
  final def interleave[B >: A](other: Observable[B]): Observable[B] =
    new Interleave2Observable(self, other)

  /** Only emits the last element emitted by the source observable,
    * after which it's completed immediately.
    */
  final def last: Observable[A] = takeLast(1)

  /** Creates a new observable that only emits the last `n` elements
    * emitted by the source.
    *
    * In case the source triggers an error, then the underlying
    * buffer gets dropped and the error gets emitted immediately.
    */
  final def takeLast(n: Int): Observable[A] =
    if (n <= 0) Observable.empty else new TakeLastObservable[A](self, n)

  /** Maps elements from the source using a function that can do
    * asynchronous processing by means of [[monix.eval.Task Task]].
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *   import scala.concurrent.duration._
    *
    *   Observable.range(0, 100)
    *     .mapEval(x => Task(x).delayExecution(1.second))
    * }}}
    *
    * @see [[mapEvalF]] for a version that works with a generic
    *      `F[_]` (e.g. `cats.effect.IO`, Scala's `Future`),
    *      powered by [[monix.eval.TaskLike]]
    */
  final def mapEval[B](f: A => Task[B]): Observable[B] =
    new MapTaskObservable[A, B](self, f)

  /** Version of [[mapEval]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * Example:
    * {{{
    *   import cats.implicits._
    *   import cats.effect.IO
    *   import cats.effect.Timer
    *   import scala.concurrent.duration._
    *   import monix.execution.Scheduler.Implicits.global
    *   import monix.catnap.SchedulerEffect
    *   // Needed for IO.sleep
    *   implicit val timer: Timer[IO] = SchedulerEffect.timerLiftIO[IO](global)
    *
    *   Observable.range(0, 100).mapEvalF { x =>
    *     IO.sleep(1.second) *> IO(x)
    *   }
    * }}}
    *
    * @see [[mapEval]] for a version specialized for
    *      [[monix.eval.Task Task]]
    */
  final def mapEvalF[F[_], B](f: A => F[B])(implicit F: TaskLike[F]): Observable[B] =
    mapEval(a => Task.from(f(a))(F))

  /** Given a mapping function that maps events to [[monix.eval.Task tasks]],
    * applies it in parallel on the source, but with a specified
    * `parallelism`, which indicates the maximum number of tasks that
    * can be executed in parallel returning them preserving original order.
    *
    * Similar in spirit with
    * [[monix.reactive.Consumer.loadBalance[A,R](parallelism* Consumer.loadBalance]],
    * but expressed as an operator that executes [[monix.eval.Task Task]]
    * instances in parallel.
    *
    * Note that when the specified `parallelism` is 1, it has the same
    * behavior as [[mapEval]].
    *
    * @param parallelism is the maximum number of tasks that can be executed
    *        in parallel, over which the source starts being
    *        back-pressured
    *
    * @param f is the mapping function that produces tasks to execute
    *        in parallel, which will eventually produce events for the
    *        resulting observable stream
    *
    * @see [[mapParallelUnordered]] for a variant that does not preserve order
    *     which may lead to faster execution times
    * @see [[mapEval]] for serial execution
    */
  final def mapParallelOrdered[B](parallelism: Int)(f: A => Task[B])(implicit
    os: OverflowStrategy[B] = OverflowStrategy.Default
  ): Observable[B] =
    new MapParallelOrderedObservable[A, B](self, parallelism, f, os)

  /** Version of [[mapParallelOrderedF]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * @param parallelism is the maximum number of tasks that can be executed
    *        in parallel, over which the source starts being
    *        back-pressured
    *
    * @param f is the mapping function that produces tasks to execute
    *        in parallel, which will eventually produce events for the
    *        resulting observable stream
    *
    * @see [[mapParallelUnorderedF]] for a variant that does not preserve order
    *     which may lead to faster execution times
    * @see [[mapEvalF]] for serial execution
    */
  final def mapParallelOrderedF[F[_], B](parallelism: Int)(
    f: A => F[B]
  )(implicit os: OverflowStrategy[B] = OverflowStrategy.Default, F: TaskLike[F]): Observable[B] =
    new MapParallelOrderedObservable[A, B](self, parallelism, f.andThen(F.apply), os)

  /** Given a mapping function that maps events to [[monix.eval.Task tasks]],
    * applies it in parallel on the source, but with a specified
    * `parallelism`, which indicates the maximum number of tasks that
    * can be executed in parallel.
    *
    * Similar in spirit with
    * [[monix.reactive.Consumer.loadBalance[A,R](parallelism* Consumer.loadBalance]],
    * but expressed as an operator that executes [[monix.eval.Task Task]]
    * instances in parallel.
    *
    * Note that when the specified `parallelism` is 1, it has the same
    * behavior as [[mapEval]].
    *
    * @param parallelism is the maximum number of tasks that can be executed
    *        in parallel, over which the source starts being
    *        back-pressured
    *
    * @param f is the mapping function that produces tasks to execute
    *        in parallel, which will eventually produce events for the
    *        resulting observable stream
    *
    * @see [[mapParallelOrdered]] for a variant that does preserve order
    * @see [[mapEval]] for serial execution
    */
  final def mapParallelUnordered[B](parallelism: Int)(f: A => Task[B])(implicit
    os: OverflowStrategy[B] = OverflowStrategy.Default
  ): Observable[B] =
    new MapParallelUnorderedObservable[A, B](self, parallelism, f, os)

  /** Version of [[mapParallelUnordered]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    * Note that when the specified `parallelism` is 1, it has the same
    * behavior as [[mapEval]].
    *
    * @param parallelism is the maximum number of tasks that can be executed
    *        in parallel, over which the source starts being
    *        back-pressured
    *
    * @param f is the mapping function that produces tasks to execute
    *        in parallel, which will eventually produce events for the
    *        resulting observable stream
    *
    * @see [[mapParallelOrdered]] for a variant that does preserve order
    * @see [[mapEval]] for serial execution
    */
  final def mapParallelUnorderedF[F[_], B](parallelism: Int)(
    f: A => F[B]
  )(implicit os: OverflowStrategy[B] = OverflowStrategy.Default, F: TaskLike[F]): Observable[B] =
    new MapParallelUnorderedObservable[A, B](self, parallelism, f.andThen(F.apply), os)

  /** Converts the source Observable that emits `A` into an Observable
    * that emits `Notification[A]`.
    */
  final def materialize: Observable[Notification[A]] =
    self.liftByOperator(new MaterializeOperator[A])

  /** Concurrently merges the observables emitted by the source, into
    * a single observable.
    *
    * ==Equivalence with mergeMap==
    *
    * The `merge` operation is [[mergeMap]] with the identity
    * function:
    *
    * `stream.merge <-> stream.mergeMap(x => x)`
    *
    * $concatMergeDifference
    *
    * == Visual Example ==
    *
    * <pre>
    * streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
    * streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
    *
    * result: a1, b1, a2, b2, b3, a3, a4, b4
    * </pre>
    *
    * @note $defaultOverflowStrategy
    * @return $mergeReturn
    */
  final def merge[B](implicit
    ev: A <:< Observable[B],
    os: OverflowStrategy[B] = OverflowStrategy.Default[B]
  ): Observable[B] =
    self.mergeMap(x => x)(os)

  /** Concurrently merges the observables emitted by the source with
    * the given generator function into a single observable.
    *
    * $concatMergeDifference
    *
    * ==Example==
    * {{{
    *   Observable(1, 2, 3).mergeMap { x =>
    *     Observable.eval(println(s"Processing $$x"))
    *       .executeAsync
    *       .flatMap(_ => Observable(x, x))
    *   }
    * }}}
    *
    * In this example the source will yield 3 streams and those 3
    * streams are being subscribed immediately, therefore the order of
    * the events will be non-deterministic, as the streams will be
    * evaluated concurrently.
    *
    * == Visual Example ==
    *
    * <pre>
    * streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
    * streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
    *
    * result: a1, b1, a2, b2, b3, a3, a4, b4
    * </pre>
    * @param f is a generator for the streams that will get merged
    * @return $mergeMapReturn
    */
  final def mergeMap[B](f: A => Observable[B])(implicit
    os: OverflowStrategy[B] = OverflowStrategy.Default
  ): Observable[B] =
    new MergeMapObservable[A, B](self, f, os, delayErrors = false)

  /** $mergeDescription
    *
    * $delayErrorsDescription
    *
    * @note $defaultOverflowStrategy
    * @return $mergeReturn
    */
  final def mergeDelayErrors[B](implicit
    ev: A <:< Observable[B],
    os: OverflowStrategy[B] = OverflowStrategy.Default
  ): Observable[B] =
    self.mergeMap(x => x)(os)

  /** $mergeMapDescription
    *
    * $delayErrorsDescription
    *
    * @param f is a generator for the streams that will get merged
    * @return $mergeMapReturn
    */
  final def mergeMapDelayErrors[B](f: A => Observable[B])(implicit
    os: OverflowStrategy[B] = OverflowStrategy.Default
  ): Observable[B] =
    new MergeMapObservable[A, B](self, f, os, delayErrors = true)

  /** Overrides the default [[monix.execution.Scheduler Scheduler]],
    * possibly forcing an asynchronous boundary on subscription
    * (if `forceAsync` is set to `true`, the default).
    *
    * When an `Observable` is subscribed with
    * [[Observable.subscribe(subscriber* subscribe]],
    * it needs a `Scheduler`, which is going to be injected in the
    * processing pipeline, to be used for managing asynchronous
    * boundaries, scheduling execution with delay, etc.
    *
    * Normally the [[monix.execution.Scheduler Scheduler]] gets injected
    * implicitly when doing `subscribe`, but this operator overrides
    * the injected subscriber for the given source. And if the source is
    * normally using that injected scheduler (given by `subscribe`),
    * then the effect will be that all processing will now happen
    * on the override.
    *
    * To put it in other words, in Monix it's usually the consumer and
    * not the producer that specifies the scheduler and this operator
    * allows for a different behavior.
    *
    * This operator also subsumes the effects of [[subscribeOn]],
    * meaning that the subscription logic itself will start on
    * the provided scheduler if `forceAsync = true` (the default).
    *
    * @see [[observeOn(s:monix\.execution\.Scheduler)* observeOn]]
    *      and [[subscribeOn]].
    *
    * @param s is the [[monix.execution.Scheduler Scheduler]] to use
    *        for overriding the default scheduler and for forcing
    *        an asynchronous boundary if `forceAsync` is `true`
    *
    * @param forceAsync indicates whether an asynchronous boundary
    *        should be forced right before the subscription of the
    *        source `Observable`, managed by the provided `Scheduler`
    *
    * @return a new `Observable` that mirrors the source on subscription,
    *         but that uses the provided scheduler for overriding
    *         the default and possibly force an extra asynchronous
    *         boundary on execution
    */
  final def executeOn(s: Scheduler, forceAsync: Boolean = true): Observable[A] =
    new ExecuteOnObservable[A](self, s, forceAsync)

  /** Mirrors the source observable, but upon subscription ensure
    * that the evaluation forks into a separate (logical) thread.
    *
    * The execution is managed by the injected
    * [[monix.execution.Scheduler scheduler]] in `subscribe()`.
    */
  final def executeAsync: Observable[A] =
    new ExecuteAsyncObservable(self)

  /** Returns a new observable that will execute the source with a different
    * [[monix.execution.ExecutionModel ExecutionModel]].
    *
    * This allows fine-tuning the options injected by the scheduler
    * locally. Example:
    *
    * {{{
    *   import monix.execution.ExecutionModel.AlwaysAsyncExecution
    *
    *   val stream = Observable(1, 2, 3)
    *     .executeWithModel(AlwaysAsyncExecution)
    * }}}
    *
    * @param em is the
    *        [[monix.execution.ExecutionModel ExecutionModel]]
    *        that will be used when evaluating the source.
    */
  final def executeWithModel(em: ExecutionModel): Observable[A] =
    new ExecuteWithModelObservable[A](self, em)

  /** Operator that specifies a different
    * [[monix.execution.Scheduler Scheduler]], on which subscribers
    * will observe events, instead of the default one.
    *
    * An `Observable` with an applied `observeOn` call will forward
    * events into a buffer that uses the specified `Scheduler`
    * reference to cycle through events and to make `onNext` calls to
    * downstream listeners.
    *
    * Example:
    * {{{
    *   import monix.execution.Scheduler
    *   import monix.execution.Scheduler.Implicits.global
    *   val io = Scheduler.io("my-io")
    *
    *   Observable(1, 2, 3).map(_ + 1)
    *     .observeOn(io)
    *     .foreach(x => println(x))
    * }}}
    *
    * In the above example the first `map` (whatever comes before the
    * `observeOn` call) gets executed using the default `Scheduler`
    * (might execute on the current thread even), however the
    * `foreach` that's specified after `observeOn` will get executed
    * on the indicated `Scheduler`.
    *
    * NOTE: this operator does not guarantee that downstream listeners
    * will actually use the specified `Scheduler` to process events,
    * because this depends on the rest of the pipeline. E.g. this will
    * not work OK:
    *
    * {{{
    *   import monix.reactive.OverflowStrategy.Unbounded
    *
    *   Observable.suspend(???)
    *     .observeOn(io).asyncBoundary(Unbounded)
    * }}}
    *
    * This sample might not do what a user of `observeOn` would
    * want. Indeed the implementation will use the provided `io`
    * reference for calling `onNext` / `onComplete` / `onError`
    * events, however because of the following asynchronous boundary
    * created the actual listeners will probably end up being execute
    * on a different `Scheduler`.
    *
    * The underlying implementation uses
    * [[monix.reactive.observers.BufferedSubscriber a buffer]]
    * to forward events. The
    * [[monix.reactive.OverflowStrategy OverflowStrategy]]
    * being applied is the
    * [[monix.reactive.OverflowStrategy.Default default one]].
    *
    * @see [[observeOn[B>:A](s:monix\.execution\.Scheduler,os:monix\.reactive\.OverflowStrategy[B]* observeOn(Scheduler, OverflowStrategy)]]
    *      for the version that allows customizing the
    *      [[monix.reactive.OverflowStrategy OverflowStrategy]]
    *      being used by the underlying buffer.
    *
    * @param s is the alternative `Scheduler` reference to use
    *        for observing events
    */
  final def observeOn(s: Scheduler): Observable[A] =
    observeOn(s, OverflowStrategy.Default)

  /** Operator that specifies a different
    * [[monix.execution.Scheduler Scheduler]], on which subscribers
    * will observe events, instead of the default one.
    *
    * This overloaded version of `observeOn` takes an extra
    * [[monix.reactive.OverflowStrategy OverflowStrategy]]
    * parameter specifying the behavior of the underlying buffer.
    *
    * @see [[observeOn(s:monix\.execution\.Scheduler)* observeOn(Scheduler)]] for
    *      the version that does not take an `OverflowStrategy` parameter.
    *
    * @param s is the alternative `Scheduler` reference to use
    *        for observing events
    * @param os is the [[monix.reactive.OverflowStrategy OverflowStrategy]]
    *        to apply to the underlying buffer
    */
  final def observeOn[B >: A](s: Scheduler, os: OverflowStrategy[B]): Observable[B] =
    new ObserveOnObservable[B](self, s, os)

  /** If the connection is [[monix.execution.Cancelable.cancel cancelled]]
    * then trigger a `CancellationException`.
    *
    * A connection can be cancelled with the help of the
    * [[monix.execution.Cancelable Cancelable]]
    * returned on [[Observable.subscribe(subscriber* subscribe]].
    *
    * Because the cancellation is effectively concurrent with the
    * signals the [[monix.reactive.Observer Observer]] receives and because
    * we need to uphold the contract, this operator will effectively
    * synchronize access to [[monix.reactive.Observer.onNext onNext]],
    * [[monix.reactive.Observer.onComplete onComplete]] and
    * [[monix.reactive.Observer.onError onError]]. It will also watch
    * out for asynchronous [[monix.execution.Ack.Stop Stop]] events.
    *
    * In other words, this operator does heavy synchronization, can
    * prove to be inefficient and you should avoid using it because
    * the signaled error can interfere with functionality from other
    * operators that use cancellation internally and cancellation in
    * general is a side-effecting operation that should be avoided,
    * unless it's necessary.
    */
  final def onCancelTriggerError: Observable[A] =
    new OnCancelTriggerErrorObservable[A](self)

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * the streaming of events continues with the specified backup
    * sequence.
    *
    * The created Observable mirrors the behavior of the source in
    * case the source does not end with an error.
    *
    * NOTE that compared with `onErrorResumeNext` from Rx.NET, the
    * streaming is not resumed in case the source is terminated
    * normally with an `onComplete`.
    *
    * @param that is a backup sequence that's being subscribed
    *        in case the source terminates with an error.
    */
  final def onErrorFallbackTo[B >: A](that: Observable[B]): Observable[B] =
    self.onErrorHandleWith(_ => that)

  /** Returns an observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which
    * case the streaming of events fallbacks to an observable
    * emitting a single element generated by the backup function.
    *
    * See [[onErrorRecover]] for the version that takes a
    * partial function as a parameter.
    *
    * @param f - a function that matches errors with a
    *        backup element that is emitted when the source
    *        throws an error.
    */
  final def onErrorHandle[B >: A](f: Throwable => B): Observable[B] =
    onErrorHandleWith { elem =>
      Observable.now(f(elem))
    }

  /** Returns an observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which
    * case the streaming of events fallbacks to an observable
    * emitting a single element generated by the backup function.
    *
    * The created Observable mirrors the behavior of the source
    * in case the source does not end with an error or if the
    * thrown `Throwable` is not matched.
    *
    * See [[onErrorHandle]] for the version that takes a
    * total function as a parameter.
    *
    * @param pf is a function that matches errors with a
    *        backup element that is emitted when the source
    *        throws an error.
    */
  final def onErrorRecover[B >: A](pf: PartialFunction[Throwable, B]): Observable[B] =
    onErrorHandleWith(ex => (pf.andThen(b => Observable.now(b)).applyOrElse(ex, Observable.raiseError _)))

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * the streaming of events continues with the specified backup
    * sequence generated by the given function.
    *
    * The created Observable mirrors the behavior of the source in
    * case the source does not end with an error or if the thrown
    * `Throwable` is not matched.
    *
    * See [[onErrorHandleWith]] for the version that takes a
    * total function as a parameter.
    *
    * @param pf is a function that matches errors with a
    *        backup throwable that is subscribed when the source
    *        throws an error.
    */
  final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Observable[B]]): Observable[B] =
    onErrorHandleWith(ex => pf.applyOrElse(ex, Observable.raiseError _))

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * the streaming of events continues with the specified backup
    * sequence generated by the given function.
    *
    * See [[onErrorRecoverWith]] for the version that takes a
    * partial function as a parameter.
    *
    * @param f is a function that matches errors with a
    *        backup throwable that is subscribed when the source
    *        throws an error.
    */
  final def onErrorHandleWith[B >: A](f: Throwable => Observable[B]): Observable[B] =
    new OnErrorRecoverWithObservable(self, f)

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * it tries subscribing to the source again in the hope that it
    * will complete without an error.
    *
    * The number of retries is limited by the specified `maxRetries`
    * parameter, so for an Observable that always ends in error the
    * total number of subscriptions that will eventually happen is
    * `maxRetries + 1`.
    */
  final def onErrorRestart(maxRetries: Long): Observable[A] = {
    require(maxRetries >= 0, "maxRetries should be positive")
    new OnErrorRetryCountedObservable(self, maxRetries)
  }

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * it tries subscribing to the source again in the hope that it
    * will complete without an error.
    *
    * The given predicate establishes if the subscription should be
    * retried or not.
    */
  final def onErrorRestartIf(p: Throwable => Boolean): Observable[A] =
    new OnErrorRetryIfObservable[A](self, p)

  /** Returns an Observable that mirrors the behavior of the source,
    * unless the source is terminated with an `onError`, in which case
    * it tries subscribing to the source again in the hope that it
    * will complete without an error.
    *
    * NOTE: The number of retries is unlimited, so something like
    * `Observable.error(new RuntimeException).onErrorRestartUnlimited`
    * will loop forever.
    */
  final def onErrorRestartUnlimited: Observable[A] =
    new OnErrorRetryCountedObservable(self, -1)

  /** Given a [[monix.reactive.Pipe Pipe]], transform
    * the source observable with it.
    */
  final def pipeThrough[I >: A, B](pipe: Pipe[I, B]): Observable[B] =
    new PipeThroughObservable(self, pipe)

  /** Returns an observable that emits the results of invoking a
    * specified selector on items emitted by a
    * [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
    * backed by [[monix.reactive.subjects.PublishSubject PublishSubject]]
    * which shares a single subscription to the underlying sequence.
    *
    * This operators takes a possibly pure Observable, transforms it to
    * Hot Observable in the scope of supplied function and then returns
    * a pure Observable again.
    *
    * ==Example==
    *
    * {{{
    *  import monix.reactive._
    *  import monix.eval.Task
    *  import scala.concurrent.duration._
    *  implicit val os: OverflowStrategy[Nothing] = OverflowStrategy.Default
    *
    *  val obs = Observable(1, 2, 3)
    *    .doOnNext(i => Task(println(s"Produced $$i")).delayExecution(1.second))
    *
    *  def consume(name: String, obs: Observable[Int]): Observable[Unit] =
    *    obs.mapEval(i => Task(println(s"$$name: got $$i")))
    *
    *  obs.publishSelector { hot =>
    *    Observable(
    *      consume("Consumer 1", hot),
    *      consume("Consumer 2", hot).delayExecution(2.second)
    *    ).merge
    *  }
    *
    * }}}
    *
    *  ==Output==
    *
    *   Produced 1
    *   Consumer 1: got 1
    *   Produced 2
    *   Consumer 1: got 2
    *   Consumer 2: got 2
    *   Produced 3
    *   Consumer 1: got 3
    *   Consumer 2: got 3
    *
    *   Note how Consumer 2 received less elements because it subscribed later.
    *
    * @param f is a selector function that can use the multicasted source sequence
    *        as many times as needed, without causing multiple subscriptions
    *        to the source sequence. Observers to the given source will
    *        receive all notifications of the source from the time of the
    *        subscription forward.
    * @see [[pipeThroughSelector]] for a version that allows specifying a type of underlying Subject.
    */
  final def publishSelector[R](f: Observable[A] => Observable[R]): Observable[R] =
    pipeThroughSelector(Pipe.publish[A], f)

  /** Returns an observable that emits the results of invoking a
    * specified selector on items emitted by a
    * [[monix.reactive.observables.ConnectableObservable ConnectableObservable]],
    * which shares a single subscription to the underlying sequence.
    *
    * This operators takes a possibly pure Observable, transforms it to
    * Hot Observable in the scope of supplied function and then returns
    * a pure Observable again. The function allows specyfing underlying
    * [[monix.reactive.subjects.Subject]] by means of [[monix.reactive.Pipe]].
    *
    * ==Example==
    *
    * {{{
    *  import monix.reactive._
    *  import monix.eval.Task
    *  import scala.concurrent.duration._
    *  implicit val os: OverflowStrategy[Nothing] = OverflowStrategy.Default
    *
    *  val obs = Observable(1, 2, 3)
    *    .doOnNext(i => Task(println(s"Produced $$i")).delayExecution(1.second))
    *
    *  def consume(name: String, obs: Observable[Int]): Observable[Unit] =
    *    obs.mapEval(i => Task(println(s"$$name: got $$i")))
    *
    *  obs.pipeThroughSelector(Pipe.replay[Int], { (hot: Observable[Int]) =>
    *    Observable(
    *      consume("Consumer 1", hot),
    *      consume("Consumer 2", hot).delayExecution(2.second)
    *    ).merge
    *  })
    *
    * }}}
    *
    *  ==Output==
    *
    *   Produced 1
    *   Consumer 1: got 1
    *   Consumer 2: got 1
    *   Produced 2
    *   Consumer 1: got 2
    *   Consumer 2: got 2
    *   Produced 3
    *   Consumer 1: got 3
    *   Consumer 2: got 3
    *
    *   Note how Consumer 2 received the same amount of elements as
    *   Consumer 1 despite subscribing later because of underlying ReplaySubject.
    *
    * @param pipe is the [[Pipe]] used to transform the source into a multicast
    *        (hot) observable that can be shared in the selector function
    * @param f is a selector function that can use the multicasted source sequence
    *        as many times as needed, without causing multiple subscriptions
    *        to the source sequence. Observers to the given source will
    *        receive all notifications of the source from the time of the
    *        subscription forward.
    */
  final def pipeThroughSelector[S >: A, B, R](pipe: Pipe[S, B], f: Observable[B] => Observable[R]): Observable[R] =
    new PipeThroughSelectorObservable[S, B, R](self, pipe, f)

  /** Applies a binary operator to a start value and all elements of
    * this Observable, going left to right and returns a new
    * Observable that emits only one item before `onComplete`.
    */
  final def reduce[B >: A](op: (B, B) => B): Observable[B] =
    self.liftByOperator(new ReduceOperator[B](op))

  /** Repeats the items emitted by the source continuously. It
    * caches the generated items until `onComplete` and repeats them
    * forever.
    *
    * It terminates either on error or if the source is empty.
    */
  final def repeat: Observable[A] =
    new RepeatSourceObservable[A](self)

  /** Keeps restarting / resubscribing the source until the predicate
    * returns `true` for the the first emitted element, after which
    * it starts mirroring the source.
    */
  final def restartUntil(p: A => Boolean): Observable[A] =
    new RestartUntilObservable[A](self, p)

  /** Emit the most recent items emitted by an observable within
    * periodic time intervals. If no new value has been emitted since
    * the last time it was sampled, it signals the last emitted value
    * anyway.
    *
    * @see [[sample]] for a variant that doesn't repeat the last value on silence
    * @see [[sampleRepeatedBy]] for fine control
    * @param period the timespan at which sampling occurs
    */
  final def sampleRepeated(period: FiniteDuration): Observable[A] =
    self.sampleRepeatedBy(Observable.intervalAtFixedRate(period, period))

  /** Returns an observable that, when the specified sampler observable
    * emits an item or completes, emits the most recently emitted item
    * (if any) emitted by the source Observable since the previous
    * emission from the sampler observable. If no new value has been
    * emitted since the last time it was sampled, it signals the last
    * emitted value anyway.
    *
    * @see [[sampleBy]] for a variant that doesn't repeat the last value on silence
    * @see [[sampleRepeated]] for a periodic sampling
    * @param sampler - the Observable to use for sampling the source Observable
    */
  final def sampleRepeatedBy[B](sampler: Observable[B]): Observable[A] =
    new ThrottleLastObservable[A, B](self, sampler, shouldRepeatOnSilence = true)

  /** Applies a binary operator to a start value and all elements of
    * this Observable, going left to right and returns a new
    * Observable that emits on each step the result of the applied
    * function.
    *
    * Similar to [[foldLeft]], but emits the state on each
    * step. Useful for modeling finite state machines.
    *
    * @see [[scan0]] for the version that emits seed element at the beginning
    */
  final def scan[S](seed: => S)(op: (S, A) => S): Observable[S] =
    new ScanObservable[A, S](self, () => seed, op)

  /** Applies a binary operator to a start value and all elements of
    * this Observable, going left to right and returns a new
    * Observable that emits on each step the result element of
    * the applied function.
    *
    * Similar to [[scan]], but the supplied function returns a tuple
    * of the next accumulator state and the result type emitted by
    * the returned observable.
    */
  final def mapAccumulate[S, R](seed: => S)(op: (S, A) => (S, R)): Observable[R] =
    new MapAccumulateObservable[A, S, R](self, () => seed, op)

  /** Applies a binary operator to a start value and all elements of
    * this Observable, going left to right and returns a new
    * Observable that emits on each step the result of the applied
    * function.
    *
    * This is a version of [[scan]] that emits seed element at the beginning,
    * similar to `scanLeft` on Scala collections
    */
  final def scan0[S](seed: => S)(op: (S, A) => S): Observable[S] =
    Observable.eval(seed).flatMap(s => s +: scan(s)(op))

  /** Applies a binary operator to a start value and all elements of
    * this stream, going left to right and returns a new stream that
    * emits on each step the result of the applied function.
    *
    * Similar with [[scan]], but this can suspend and evaluate
    * side effects with an `F[_]` data type that implements the
    * `cats.effect.Effect` type class, thus allowing for lazy or
    * asynchronous data processing.
    *
    * Similar to [[foldLeft]] and [[foldWhileLeft]], but emits the
    * state on each step. Useful for modeling finite state machines.
    *
    * Example showing how state can be evolved and acted upon:
    *
    * {{{
    *   // Using cats.effect.IO for evaluating our side effects
    *   import cats.effect.IO
    *
    *   sealed trait State[+A] { def count: Int }
    *   case object Init extends State[Nothing] { def count = 0 }
    *   case class Current[A](current: Option[A], count: Int)
    *     extends State[A]
    *
    *   case class Person(id: Int, name: String)
    *
    *   // TODO: to implement!
    *   def requestPersonDetails(id: Int): IO[Option[Person]] =
    *     IO.raiseError(new NotImplementedError)
    *
    *   // TODO: to implement
    *   val source: Observable[Int] =
    *     Observable.raiseError(new NotImplementedError)
    *
    *   // Initial state
    *   val seed = IO.pure(Init : State[Person])
    *
    *   val scanned = source.scanEvalF(seed) { (state, id) =>
    *     requestPersonDetails(id).map { person =>
    *       state match {
    *         case Init =>
    *           Current(person, 1)
    *         case Current(_, count) =>
    *           Current(person, count + 1)
    *       }
    *     }
    *   }
    *
    *   val filtered = scanned
    *     .takeWhile(_.count < 10)
    *     .collect { case Current(a, _) => a }
    * }}}
    *
    * @see [[scanEval0F]] for the version that emits seed element at the beginning
    *
    * @see [[scan]] for the synchronous, non-lazy version, or
    *      [[scanEval]] for the [[monix.eval.Task Task]]-specialized
    *      version.
    *
    * @param seed is the initial state
    * @param op is the function that evolves the current state
    *
    * @param F is the `cats.effect.Effect` type class implementation
    *        for type `F`, which controls the evaluation. `F` can be
    *        a data type such as [[monix.eval.Task]] or `cats.effect.IO`,
    *        which implement `Effect`.
    *
    * @return a new observable that emits all intermediate states being
    *         resulted from applying the given function
    */
  final def scanEvalF[F[_], S](seed: F[S])(op: (S, A) => F[S])(implicit F: TaskLike[F]): Observable[S] =
    scanEval(Task.from(seed)(F))((s, a) => Task.from(op(s, a))(F))

  /** Applies a binary operator to a start value and all elements of
    * this stream, going left to right and returns a new stream that
    * emits on each step the result of the applied function.
    *
    * This is a version of [[scanEvalF]] that emits seed element at the beginning,
    * similar to `scanLeft` on Scala collections
    */
  final def scanEval0F[F[_], S](seed: F[S])(
    op: (S, A) => F[S]
  )(implicit F: TaskLike[F], A: Applicative[F]): Observable[S] =
    Observable.fromTaskLike(seed).flatMap(s => s +: scanEvalF(A.pure(s))(op))

  /** Applies a binary operator to a start value and all elements of
    * this stream, going left to right and returns a new stream that
    * emits on each step the result of the applied function.
    *
    * Similar with [[scan]], but this can suspend and evaluate
    * side effects with [[monix.eval.Task Task]], thus allowing for
    * asynchronous data processing.
    *
    * Similar to [[foldLeft]] and [[foldWhileLeft]], but emits the
    * state on each step. Useful for modeling finite state machines.
    *
    * Example showing how state can be evolved and acted upon:
    *
    * {{{
    *   import monix.eval.Task
    *
    *   sealed trait State[+A] { def count: Int }
    *   case object Init extends State[Nothing] { def count = 0 }
    *   case class Current[A](current: Option[A], count: Int)
    *     extends State[A]
    *
    *   case class Person(id: Int, name: String)
    *
    *   // TODO: to implement!
    *   def requestPersonDetails(id: Int): Task[Option[Person]] =
    *     Task.raiseError(new NotImplementedError)
    *
    *   // TODO: to implement
    *   val source: Observable[Int] =
    *     Observable.raiseError(new NotImplementedError)
    *
    *   // Initial state
    *   val seed = Task.pure(Init : State[Person])
    *
    *   val scanned = source.scanEval(seed) { (state, id) =>
    *     requestPersonDetails(id).map { person =>
    *       state match {
    *         case Init =>
    *           Current(person, 1)
    *         case Current(_, count) =>
    *           Current(person, count + 1)
    *       }
    *     }
    *   }
    *
    *   val filtered = scanned
    *     .takeWhile(_.count < 10)
    *     .collect { case Current(a, _) => a }
    * }}}
    *
    * @see [[scanEval0]] for the version that emits seed element at the beginning
    * @see [[scan]] for the version that does not require using `Task`
    *      in the provided operator
    *
    * @param seed is the initial state
    * @param op is the function that evolves the current state
    *
    * @return a new observable that emits all intermediate states being
    *         resulted from applying the given function
    */
  final def scanEval[S](seed: Task[S])(op: (S, A) => Task[S]): Observable[S] =
    new ScanTaskObservable(self, seed, op)

  /** Applies a binary operator to a start value and all elements of
    * this stream, going left to right and returns a new stream that
    * emits on each step the result of the applied function.
    *
    * This is a version of [[scanEval]] that emits seed element at the beginning.
    */
  final def scanEval0[S](seed: Task[S])(op: (S, A) => Task[S]): Observable[S] =
    Observable.fromTask(seed).flatMap(s => s +: scanEval(Task.pure(s))(op))

  /** Given a mapping function that returns a `B` type for which we have
    * a [[cats.Monoid]] instance, returns a new stream that folds the incoming
    * elements of the sources using the provided `Monoid[B].combine`, with the
    * initial seed being the `Monoid[B].empty` value, emitting the generated values
    * at each step.
    *
    * Equivalent with [[scan]] applied with the given [[cats.Monoid]], so given
    * our `f` mapping function returns a `B`, this law holds:
    *
    * <pre>
    * val B = implicitly[Monoid[B]]
    *
    * stream.scanMap(f) <-> stream.scan(B.empty)(B.combine)
    * </pre>
    *
    * Example:
    * {{{
    *   import cats.implicits._
    *
    *   // Yields 2, 6, 12, 20, 30, 42
    *   val stream = Observable(1, 2, 3, 4, 5, 6).scanMap(x => x * 2)
    * }}}
    *
    * @param f is the mapping function applied to every incoming element of this `Observable`
    *          before folding using `Monoid[B].combine`
    *
    * @return a new `Observable` that emits all intermediate states being
    *         resulted from applying `Monoid[B].combine` function
    */
  final def scanMap[B](f: A => B)(implicit B: Monoid[B]): Observable[B] =
    self.scan(B.empty)((acc, a) => B.combine(acc, f(a)))

  /** Given a mapping function that returns a `B` type for which we have
    * a [[cats.Monoid]] instance, returns a new stream that folds the incoming
    * elements of the sources using the provided `Monoid[B].combine`, with the
    * initial seed being the `Monoid[B].empty` value, emitting the generated values
    * at each step.
    *
    * This is a version of [[scanMap]] that emits seed element at the beginning.
    */
  final def scanMap0[B](f: A => B)(implicit B: Monoid[B]): Observable[B] =
    B.empty +: scanMap(f)

  /** Creates a new Observable that emits the given elements and then
    * it also emits the events of the source (prepend operation).
    */
  final def startWith[B >: A](elems: Seq[B]): Observable[B] =
    Observable.fromIterable(elems).appendAll(self)

  /** Returns a new Observable that uses the specified `Scheduler` for
    * initiating the subscription.
    *
    * This is different from [[executeOn]] because the given `scheduler`
    * is only used to start the subscription, but does not override the
    * default [[monix.execution.Scheduler Scheduler]].
    */
  final def subscribeOn(scheduler: Scheduler): Observable[A] =
    new SubscribeOnObservable[A](self, scheduler)

  /** In case the source is empty, switch to the given backup. */
  final def switchIfEmpty[B >: A](backup: Observable[B]): Observable[B] =
    new SwitchIfEmptyObservable[B](self, backup)

  /** Drops the first element of the source observable,
    * emitting the rest.
    */
  final def tail: Observable[A] = drop(1L)

  /** Overload of [[drop(n:Long* drop(Long)]]. */
  final def drop(n: Int): Observable[A] =
    self.liftByOperator(new DropFirstOperator(n.toLong))

  /** Drops the first `n` elements (from the start).
    *
    * @param n the number (Long) of elements to drop
    * @return a new Observable that drops the first ''n'' elements
    *         emitted by the source
    */
  final def drop(n: Long): Observable[A] =
    self.liftByOperator(new DropFirstOperator(n))

  /** Creates a new Observable that emits the events of the source, only
    * for the specified `timestamp`, after which it completes.
    *
    * @param timespan the window of time during which the new Observable
    *        is allowed to emit the events of the source
    */
  final def takeByTimespan(timespan: FiniteDuration): Observable[A] =
    new TakeLeftByTimespanObservable(self, timespan)

  /** Creates a new Observable that emits every n-th event from the source,
    * dropping intermediary events.
    */
  final def takeEveryNth(n: Int): Observable[A] =
    self.liftByOperator(new TakeEveryNthOperator(n))

  /** Creates a new observable that mirrors the source until
    * the given `trigger` emits either an element or `onComplete`,
    * after which it is completed.
    *
    * The resulting observable is completed as soon as `trigger`
    * emits either an `onNext` or `onComplete`. If `trigger`
    * emits an `onError`, then the resulting observable is also
    * completed with error.
    *
    * @param trigger is an observable that will cancel the
    *        streaming as soon as it emits an event
    */
  final def takeUntil(trigger: Observable[Any]): Observable[A] =
    new TakeUntilObservable[A](self, trigger)

  /** Version of [[takeUntil]] that can work with a trigger expressed by a [[monix.eval.Task]]
    *
    * @see [[takeUntil]] for version that works with Observable.
    * @see [[takeUntilEvalF]] for version that works with generic `F[_]` powered by [[monix.eval.TaskLike]].
    *
    * @param trigger task that will cancel the stream as soon as it completes.
    */
  final def takeUntilEval(trigger: Task[_]): Observable[A] =
    self.takeUntil(Observable.fromTask(trigger))

  /** Version of [[takeUntil]] that can work with a trigger expressed by a generic `F[_]`
    * provided an implicit [[monix.eval.TaskLike]] exists.
    *
    * @see [[takeUntil]] for version that works with Observable.
    * @see [[takeUntilEval]] for version that works with [[monix.eval.Task]].
    *
    * @param trigger operation that will cancel the stream as soon as it completes.
    */
  final def takeUntilEvalF[F[_], B](trigger: F[B])(implicit taskLike: TaskLike[F]): Observable[A] =
    self.takeUntil(Observable.fromTaskLike(trigger))

  /** Takes longest prefix of elements that satisfy the given predicate
    * and returns a new Observable that emits those elements.
    */
  final def takeWhile(p: A => Boolean): Observable[A] =
    self.liftByOperator(new TakeByPredicateOperator(p, inclusive = false))

  /** Takes longest prefix of elements that satisfy the given predicate, inclusive of
    * the value that caused `predicate` to return `false` and returns a new Observable that emits those elements.
    */
  final def takeWhileInclusive(p: A => Boolean): Observable[A] =
    self.liftByOperator(new TakeByPredicateOperator(p, inclusive = true))

  /** Takes longest prefix of elements while given [[monix.execution.cancelables.BooleanCancelable BooleanCancelable]]
    * is not canceled and returns a new Observable that emits those elements.
    */
  final def takeWhileNotCanceled(c: BooleanCancelable): Observable[A] =
    self.liftByOperator(new TakeWhileNotCanceledOperator(c))

  /** Returns an Observable that emits maximum `n` items per given `period`.
    *
    * Unlike [[Observable!.throttleLast]] and [[Observable!.throttleFirst]]
    * it does not discard any elements.
    *
    * If the source observable completes, then the current buffer gets
    * signaled downstream. If the source triggers an error then the
    * current buffer is being dropped and the error gets propagated
    * immediately.
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   // emits two items per second
    *   Observable.fromIterable(0 to 10)
    *     .throttle(1.second, 2)
    * }}}
    *
    * @param  period time that has to pass before emiting new items
    * @param  n      maximum number of items emitted per given `period`
    */
  final def throttle(period: FiniteDuration, n: Int): Observable[A] =
    bufferTimedWithPressure[A](period, n).flatMap(Observable.fromIterable)

  /** Returns an Observable that emits only the first item emitted by
    * the source Observable during sequential time windows of a
    * specified duration.
    *
    * This differs from [[Observable!.throttleLast]] in that this only
    * tracks passage of time whereas `throttleLast` ticks at scheduled
    * intervals.
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   // emits 0, 5, 10 in 1 second intervals
    *   Observable.fromIterable(0 to 10)
    *     // without delay, it would return only 0
    *     .delayOnNext(200.millis)
    *     .throttleFirst(1.second)
    * }}}
    *
    * @see [[throttle]] for a version that allows to specify number
    *      of elements processed by a period and does not drop any elements
    * @param interval time to wait before emitting another item after
    *        emitting the last item
    */
  final def throttleFirst(interval: FiniteDuration): Observable[A] =
    self.liftByOperator(new ThrottleFirstOperator[A](interval))

  /** Emit the most recent items emitted by the source within
    * periodic time intervals.
    *
    * Alias for [[sample]].
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   // emits 3, 8, 10 in 1 second intervals
    *   Observable.fromIterable(0 to 10)
    *     // without delay, it would return only 10
    *     .delayOnNext(200.millis)
    *     .throttleLast(1.second)
    * }}}
    *
    * @see [[throttle]] for a version that allows to specify number
    *      of elements processed by a period and does not drop any elements
    * @param period duration of windows within which the last item
    *        emitted by the source Observable will be emitted
    */
  final def throttleLast(period: FiniteDuration): Observable[A] =
    sample(period)

  /** Emit first element emitted by the source and then
    * emit the most recent items emitted by the source within
    * periodic time intervals.
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   // emits 0 after 200 ms and then 4,9 in 1 sec intervals and 10 after the observable completes
    *   Observable.fromIterable(0 to 10)
    *     // without delay, it would return only 0, 10
    *     .delayOnNext(200.millis)
    *     .throttleLatest(1.second, true)
    * }}}
    *
    * @param period duration of windows within which the last item
    *        emitted by the source Observable will be emitted
    * @param emitLast if true last element will be emitted when source completes
    *                 no matter if interval has passed or not
    */
  final def throttleLatest(period: FiniteDuration, emitLast: Boolean): Observable[A] =
    new ThrottleLatestObservable[A](self, period, emitLast)
  /** Emit the most recent items emitted by the source within
    * periodic time intervals.
    *
    * Use the `sample` operator to periodically look at an observable
    * to see what item it has most recently emitted since the previous
    * sampling. Note that if the source observable has emitted no
    * items since the last time it was sampled, the observable that
    * results from the `sample` operator will emit no item for that
    * sampling period.
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   // emits 3, 8, 10 in 1 second intervals
    *   Observable.fromIterable(0 to 10)
    *     // without delay, it would return only 10
    *     .delayOnNext(200.millis)
    *     .sample(1.second)
    * }}}
    *
    * @see [[sampleBy]] for fine control
    * @see [[sampleRepeated]] for repeating the last value on silence
    * @see [[throttle]] for a version that allows to specify number
    *      of elements processed by a period and does not drop any elements
    *
    * @param period the timespan at which sampling occurs
    */
  final def sample(period: FiniteDuration): Observable[A] =
    self.sampleBy(Observable.intervalAtFixedRate(period, period))

  /** Returns an observable that, when the specified sampler
    * emits an item or completes, emits the most recently emitted item
    * (if any) emitted by the source since the previous
    * emission from the sampler.
    *
    * Use the `sampleBy` operator to periodically look at an observable
    * to see what item it has most recently emitted since the previous
    * sampling. Note that if the source observable has emitted no
    * items since the last time it was sampled, the observable that
    * results from the `sampleBy` operator will emit no item.
    *
    * @see [[sample]] for periodic sampling
    * @see [[sampleRepeatedBy]] for repeating the last value on silence
    * @param sampler - the observable to use for sampling the source
    */
  final def sampleBy[B](sampler: Observable[B]): Observable[A] =
    new ThrottleLastObservable[A, B](self, sampler, shouldRepeatOnSilence = false)

  /** Only emit an item from an observable if a particular timespan has
    * passed without it emitting another item.
    *
    * Note: If the source observable keeps emitting items more
    * frequently than the length of the time window, then no items will
    * be emitted by the resulting observable.
    *
    * Alias for [[debounce]].
    *
    * @param timeout the length of the window of time that must pass after
    *        the emission of an item from the source observable in
    *        which that observable emits no items in order for the
    *        item to be emitted by the resulting observable
    * @see [[echoOnce]] for a similar operator that also mirrors
    *     the source observable
    */
  final def throttleWithTimeout(timeout: FiniteDuration): Observable[A] =
    debounce(timeout)

  /** Only emit an item from an observable if a particular timespan has
    * passed without it emitting another item.
    *
    * Note: If the source observable keeps emitting items more
    * frequently than the length of the time window, then no items will
    * be emitted by the resulting observable.
    *
    * Usage:
    * {{{
    *   import scala.concurrent.duration._
    *
    *   (Observable("M", "O", "N", "I", "X") ++ Observable.never)
    *     .delayOnNext(100.millis)
    *     .scan("")(_ ++ _)
    *     .debounce(200.millis)
    *     .dump("O")
    *
    *   // Output:
    *   // 0: O --> MONIX
    * }}}
    *
    * @param timeout the length of the window of time that must pass after
    *        the emission of an item from the source observable in
    *        which that observable emits no items in order for the
    *        item to be emitted by the resulting observable
    * @see [[echoOnce]] for a similar operator that also mirrors
    *     the source observable
    */
  final def debounce(timeout: FiniteDuration): Observable[A] =
    new DebounceObservable(self, timeout, repeat = false)

  /** Returns an observable that mirrors the source but applies a timeout
    * for each `onNext` message. If downstream subscriber takes more time than the given
    * timespan to process an `onNext` message, the source is terminated and downstream gets
    * subscribed to the given backup.
    *
    * Note that this ignores the time it takes for the upstream to send
    * `onNext` messages. For detecting slow producers see [[timeoutOnSlowUpstream]].
    *
    * @param timeout maximum duration for `onNext`.
    * @param backup alternative data source to subscribe to on timeout.
    */
  final def timeoutOnSlowDownstreamTo[B >: A](timeout: FiniteDuration, backup: Observable[B]): Observable[B] =
    self.timeoutOnSlowDownstream(timeout).onErrorHandleWith {
      case DownstreamTimeoutException(`timeout`) => backup
      case other => Observable.raiseError(other)
    }

  /** Returns an observable that mirrors the source but that will trigger a
    * [[monix.execution.exceptions.DownstreamTimeoutException DownstreamTimeoutException]]
    * in case the downstream subscriber takes more than the given timespan
    * to process an `onNext` message.
    *
    * Note that this ignores the time it takes for the upstream to send
    * `onNext` messages. For detecting slow producers see [[timeoutOnSlowUpstream]].
    *
    * @param timeout maximum duration for `onNext`.
    */
  final def timeoutOnSlowDownstream(timeout: FiniteDuration): Observable[A] =
    new DownstreamTimeoutObservable[A](self, timeout)

  /** Returns an observable that mirrors the source but applies a timeout
    * for each emitted item by the upstream. If the next item isn't
    * emitted within the specified timeout duration starting from its
    * predecessor, the source is terminated and the downstream gets
    * subscribed to the given backup.
    *
    * Note that this ignores the time it takes to process `onNext`.
    * If dealing with a slow consumer, see [[timeoutOnSlowDownstream]].
    *
    * @param timeout maximum duration between emitted items before
    *        a timeout occurs (ignoring the time it takes to process `onNext`)
    * @param backup is the alternative data source to subscribe to on timeout
    */
  final def timeoutOnSlowUpstreamTo[B >: A](timeout: FiniteDuration, backup: Observable[B]): Observable[B] =
    self.timeoutOnSlowUpstream(timeout).onErrorHandleWith {
      case UpstreamTimeoutException(`timeout`) => backup
      case other => Observable.raiseError(other)
    }

  /** Returns an observable that mirrors the source but applies a timeout
    * for each emitted item by the upstream. If the next item isn't
    * emitted within the specified timeout duration starting from its
    * predecessor, the resulting Observable terminates and notifies
    * observers of a TimeoutException.
    *
    * Note that this ignores the time it takes to process `onNext`.
    * If dealing with a slow consumer, see [[timeoutOnSlowDownstream]].
    *
    * @param timeout maximum duration between emitted items before
    *        a timeout occurs (ignoring the time it takes to process `onNext`)
    */
  final def timeoutOnSlowUpstream(timeout: FiniteDuration): Observable[A] =
    new UpstreamTimeoutObservable[A](self, timeout)

  /** While the destination observer is busy, buffers events, applying
    * the given overflowStrategy.
    *
    * @param overflowStrategy - $overflowStrategyParam
    */
  final def whileBusyBuffer[B >: A](overflowStrategy: OverflowStrategy.Synchronous[B]): Observable[B] =
    asyncBoundary(overflowStrategy)

  /** Forces a buffered asynchronous boundary.
    * Asynchronous boundary refers to an independent processing
    * of an upstream and a downstream - producer does not have to wait
    * for consumer to acknowledge a new event.
    *
    * Internally it wraps the observer implementation given to
    * `onSubscribe` into a
    * [[monix.reactive.observers.BufferedSubscriber BufferedSubscriber]].
    *
    * Normally Monix's implementation guarantees that events are
    * not emitted concurrently, and that the publisher MUST NOT
    * emit the next event without acknowledgement from the
    * consumer that it may proceed, however for badly behaved
    * publishers, this wrapper provides the guarantee that the
    * downstream [[monix.reactive.Observer Observer]] given in
    * `subscribe` will not receive concurrent events.
    *
    * WARNING: if the buffer created by this operator is
    * unbounded, it can blow up the process if the data source
    * is pushing events faster than what the observer can
    * consume, as it introduces an asynchronous boundary that
    * eliminates the back-pressure requirements of the data
    * source. Unbounded is the default
    * [[monix.reactive.OverflowStrategy overflowStrategy]], see
    * [[monix.reactive.OverflowStrategy OverflowStrategy]] for
    * options.
    *
    * Usage:
    *
    * {{{
    *   import monix.eval.Task
    *   import scala.concurrent.duration._
    *
    *   Observable("A", "B", "C", "D")
    *     .mapEval(i => Task { println(s"1: Processing $$i"); i ++ i })
    *     .asyncBoundary(OverflowStrategy.Unbounded)
    *     .mapEval(i => Task { println(s"2: Processing $$i") }.delayExecution(100.millis))
    *
    *   // Without asyncBoundary it would process A, AA, B, BB, ...
    *   // 1: Processing A
    *   // 1: Processing B
    *   // 1: Processing C
    *   // 1: Processing D
    *   // 2: Processing AA
    *   // 2: Processing BB
    *   // 2: Processing CC
    *   // 2: Processing DD
    * }}}
    *
    * @param overflowStrategy - $overflowStrategyParam
    */
  final def asyncBoundary[B >: A](overflowStrategy: OverflowStrategy[B]): Observable[B] =
    liftByOperator(new AsyncBoundaryOperator[B](overflowStrategy))

  /** While the destination observer is busy, drop the incoming events.
    */
  final def whileBusyDropEvents: Observable[A] =
    self.liftByOperator(new WhileBusyDropEventsOperator[A])

  /** While the destination observer is busy, drop the incoming events.
    * When the downstream recovers, we can signal a special event
    * meant to inform the downstream observer how many events where
    * dropped.
    *
    * @param onOverflow - $onOverflowParam
    */
  final def whileBusyDropEventsAndSignal[B >: A](onOverflow: Long => B): Observable[B] =
    self.liftByOperator(new WhileBusyDropEventsAndSignalOperator[B](onOverflow))

  /** Conflates events when a downstream is slower than the upstream.
    *
    * Emits: Immediately when an element is received if the downstream is waiting for elements. Otherwise emits when the
    * downstream stops backpressuring and there is a conflated element available.
    * Back pressures: Never (conflates instead)
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *   import cats.data.Chain
    *
    *   // Emits [0], [1, 2], [3, 4]
    *   Observable.range(0, 5)
    *     .throttle(1.second, 1)
    *     .whileBusyAggregateEvents(Chain.apply(_)){ case (chain, ele) => chain.append(ele) }
    *     .throttle(2.seconds, 1)
    * }}}
    *
    */
  def whileBusyAggregateEvents[S](seed: A => S)(aggregate: (S, A) => S): Observable[S] = {
    self.liftByOperator(new WhileBusyAggregateEventsOperator[A, S](seed, aggregate))
  }

  /** Reduces elements when a downstream is slower than the upstream.
    *
    * Emits: Immediately when an element is received if the downstream is waiting for elements. Otherwise emits when the
    * downstream stops backpressuring and there is a reduced element available.
    * Back pressures: Never (reduces instead)
    *
    * Usage:
    *
    * {{{
    *   import scala.concurrent.duration._
    *   import cats.data.Chain
    *
    *   // Emits 0, 3 (1+2), 7 (3+4)
    *   Observable.range(0, 5)
    *     .throttle(1.second, 1)
    *     .whileBusyReduceEvents(_ + _)
    *     .throttle(2.seconds, 1)
    * }}}
    *
    */
  final def whileBusyReduceEvents[B >: A](op: (B, B) => B): Observable[B] = {
    self.whileBusyAggregateEvents[B](identity)(op)
  }

  /** Combines the elements emitted by the source with the latest element
    * emitted by another observable.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (3, 3), (4, 3)
    * </pre>
    *
    * @param other is an observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
    new WithLatestFromObservable[A, B, R](self, other, f)

  /** Combines the elements emitted by the source with the latest elements
    * emitted by two observables.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * @param o1 is the first observable that gets paired with the source
    * @param o2 is the second observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom2[B1, B2, R](o1: Observable[B1], o2: Observable[B2])(f: (A, B1, B2) => R): Observable[R] =
    self.withLatestFrom(Observable.combineLatest2(o1, o2)) { (a, tuple) =>
      f(a, tuple._1, tuple._2)
    }

  /** Combines the elements emitted by the source with the latest elements
    * emitted by three observables.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * @param o1 is the first observable that gets paired with the source
    * @param o2 is the second observable that gets paired with the source
    * @param o3 is the third observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom3[B1, B2, B3, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3])(
    f: (A, B1, B2, B3) => R
  ): Observable[R] = {

    self.withLatestFrom(Observable.combineLatest3(o1, o2, o3)) { (a, o) =>
      f(a, o._1, o._2, o._3)
    }
  }

  /** Combines the elements emitted by the source with the latest elements
    * emitted by four observables.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * @param o1 is the first observable that gets paired with the source
    * @param o2 is the second observable that gets paired with the source
    * @param o3 is the third observable that gets paired with the source
    * @param o4 is the fourth observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom4[B1, B2, B3, B4, R](
    o1: Observable[B1],
    o2: Observable[B2],
    o3: Observable[B3],
    o4: Observable[B4]
  )(f: (A, B1, B2, B3, B4) => R): Observable[R] = {

    self.withLatestFrom(Observable.combineLatest4(o1, o2, o3, o4)) { (a, o) =>
      f(a, o._1, o._2, o._3, o._4)
    }
  }

  /** Combines the elements emitted by the source with the latest elements
    * emitted by five observables.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * @param o1 is the first observable that gets paired with the source
    * @param o2 is the second observable that gets paired with the source
    * @param o3 is the third observable that gets paired with the source
    * @param o4 is the fourth observable that gets paired with the source
    * @param o5 is the fifth observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom5[B1, B2, B3, B4, B5, R](
    o1: Observable[B1],
    o2: Observable[B2],
    o3: Observable[B3],
    o4: Observable[B4],
    o5: Observable[B5]
  )(f: (A, B1, B2, B3, B4, B5) => R): Observable[R] = {

    self.withLatestFrom(Observable.combineLatest5(o1, o2, o3, o4, o5)) { (a, o) =>
      f(a, o._1, o._2, o._3, o._4, o._5)
    }
  }

  /** Combines the elements emitted by the source with the latest elements
    * emitted by six observables.
    *
    * Similar with `combineLatest`, but only emits items when the single source
    * emits an item (not when any of the Observables that are passed to the operator
    * do, as combineLatest does).
    *
    * @param o1 is the first observable that gets paired with the source
    * @param o2 is the second observable that gets paired with the source
    * @param o3 is the third observable that gets paired with the source
    * @param o4 is the fourth observable that gets paired with the source
    * @param o5 is the fifth observable that gets paired with the source
    * @param o6 is the sixth observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def withLatestFrom6[B1, B2, B3, B4, B5, B6, R](
    o1: Observable[B1],
    o2: Observable[B2],
    o3: Observable[B3],
    o4: Observable[B4],
    o5: Observable[B5],
    o6: Observable[B6]
  )(f: (A, B1, B2, B3, B4, B5, B6) => R): Observable[R] = {

    self.withLatestFrom(Observable.combineLatest6(o1, o2, o3, o4, o5, o6)) { (a, o) =>
      f(a, o._1, o._2, o._3, o._4, o._5, o._6)
    }
  }

  /** Creates a new observable from this observable and another given
    * observable by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the tuple of the
    * first items emitted by each of the source observables; the second item
    * emitted by the new observable will be a tuple with the second items
    * emitted by each of those observables; and so forth.
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (3, 3), (4, 4)
    * </pre>
    *
    * See [[combineLatest]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param other is an observable that gets paired with the source
    * @return a new observable sequence that emits the paired items
    *         of the source observables
    */
  final def zip[B](other: Observable[B]): Observable[(A, B)] =
    new Zip2Observable[A, B, (A, B)](self, other)((a, b) => (a, b))

  /** Creates a new observable from this observable and another given
    * observable by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first item emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second item
    * emitted by each of those observables; and so forth.
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (3, 3), (4, 4)
    * </pre>
    *
    * See [[combineLatestMap]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param other is an observable that gets paired with the source
    * @param f is a mapping function over the generated pairs
    */
  final def zipMap[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
    new Zip2Observable[A, B, R](self, other)(f)

  /** Zips the emitted elements of the source with their indices. */
  final def zipWithIndex: Observable[(A, Long)] =
    self.liftByOperator(new ZipWithIndexOperator[A])

  /** Creates a new observable from this observable that will emit a specific `separator`
    * between every pair of elements.
    *
    * Usage sample:
    *
    * {{{
    *   // Yields "a : b : c : d"
    *   Observable("a", "b", "c", "d")
    *     .intersperse(" : ")
    *     .foldLeftL("")(_ ++ _)
    * }}}
    *
    * @param separator is the separator
    */
  final def intersperse[B >: A](separator: B): Observable[B] =
    new IntersperseObservable(self, None, separator, None)

  /** Creates a new observable from this observable that will emit the `start` element
    * followed by the upstream elements paired with the `separator`, and lastly the `end` element.
    *
    * Usage sample:
    *
    * {{{
    *   // Yields "begin a : b : c : d end"
    *   Observable("a", "b", "c", "d")
    *     .intersperse("begin ", " : ", " end")
    *     .foldLeftL("")(_ ++ _)
    * }}}
    *
    * @param start is the first element emitted
    * @param separator is the separator
    * @param end the last element emitted
    */
  final def intersperse[B >: A](start: B, separator: B, end: B): Observable[B] =
    new IntersperseObservable(self, Some(start), separator, Some(end))

  /** Converts this `Observable` into an `org.reactivestreams.Publisher`.
    *
    * Meant for interoperability with other Reactive Streams
    * implementations.
    *
    * Usage sample:
    *
    * {{{
    *   import monix.eval.Task
    *   import monix.execution.rstreams.SingleAssignSubscription
    *   import org.reactivestreams.{Publisher, Subscriber, Subscription}
    *
    *   def sum(source: Publisher[Int], requestSize: Int): Task[Long] =
    *     Task.create { (_, cb) =>
    *       val sub = SingleAssignSubscription()
    *
    *       source.subscribe(new Subscriber[Int] {
    *         private[this] var requested = 0L
    *         private[this] var sum = 0L
    *
    *         def onSubscribe(s: Subscription): Unit = {
    *           sub := s
    *           requested = requestSize
    *           s.request(requestSize)
    *         }
    *
    *         def onNext(t: Int): Unit = {
    *           sum += t
    *           if (requestSize != Long.MaxValue) requested -= 1
    *
    *           if (requested <= 0) {
    *             requested = requestSize
    *             sub.request(requestSize)
    *           }
    *         }
    *
    *         def onError(t: Throwable): Unit =
    *           cb.onError(t)
    *         def onComplete(): Unit =
    *           cb.onSuccess(sum)
    *       })
    *
    *       // Cancelable that can be used by Task
    *       sub
    *     }
    *
    *   import monix.execution.Scheduler.Implicits.global
    *   val pub = Observable(1, 2, 3, 4).toReactivePublisher
    *
    *   // Yields 10
    *   sum(pub, requestSize = 128)
    * }}}
    *
    * See the [[http://www.reactive-streams.org/ Reactive Streams]]
    * protocol for details.
    */
  final def toReactivePublisher[B >: A](implicit s: Scheduler): RPublisher[B] =
    new RPublisher[B] {
      def subscribe(subscriber: RSubscriber[_ >: B]): Unit = {
        val subscription = SingleAssignCancelable()
        subscription := unsafeSubscribeFn(
          SafeSubscriber(
            Subscriber.fromReactiveSubscriber(subscriber, subscription)
          )
        )
        ()
      }
    }

  /** Returns a [[monix.eval.Task Task]] that upon execution
    * will signal the last generated element of the source observable.
    *
    * Returns an `Option` because the source can be empty.
    */
  final def lastOptionL: Task[Option[A]] =
    map(Some.apply).lastOrElseL(None)

  /** Creates a new [[monix.eval.Task Task]] that upon execution
    * will signal the last generated element of the source observable.
    *
    * In case the stream was empty, then the given default gets
    * evaluated and emitted.
    */
  final def lastOrElseL[B >: A](default: => B): Task[B] =
    Task.create { (s, cb) =>
      unsafeSubscribeFn(new Subscriber.Sync[A] {
        implicit val scheduler: Scheduler = s
        private[this] var value: A = _
        private[this] var isEmpty = true

        def onNext(elem: A): Ack = {
          if (isEmpty) isEmpty = false
          value = elem
          Continue
        }

        def onError(ex: Throwable): Unit = {
          cb.onError(ex)
        }

        def onComplete(): Unit = {
          if (isEmpty)
            cb(Try(default))
          else
            cb.onSuccess(value)
        }
      })
    }

  /** Creates a new Observable that emits the total number of `onNext`
    * events that were emitted by the source.
    *
    * Note that this Observable emits only one item after the source
    * is complete.  And in case the source emits an error, then only
    * that error will be emitted.
    */
  final def count: Observable[Long] =
    self.liftByOperator(CountOperator)

  /** Creates a task that emits the total number of `onNext`
    * events that were emitted by the source.
    */
  final def countL: Task[Long] =
    count.headL

  /** Returns an Observable which only emits the first item for which
    * the predicate holds.
    *
    * @param p is a function that evaluates the items emitted by the
    *        source Observable, returning `true` if they pass the filter
    * @return an Observable that emits only the first item in the original
    *         Observable for which the filter evaluates as `true`
    */
  final def find(p: A => Boolean): Observable[A] =
    filter(p).head

  /** Returns a task which emits the first item for which
    * the predicate holds.
    *
    * @param p is a function that evaluates the items emitted by the
    *        source observable, returning `true` if they pass the filter
    *
    * @return a task that emits the first item in the source
    *         observable for which the filter evaluates as `true`
    */
  final def findL(p: A => Boolean): Task[Option[A]] =
    find(p).headOptionL

  /** Given evidence that type `A` has a `cats.Monoid` implementation,
    * folds the stream with the provided monoid definition.
    *
    * For streams emitting numbers, this effectively sums them up.
    * For strings, this concatenates them.
    *
    * Example:
    *
    * {{{
    *   import cats.implicits._
    *
    *   // Yields 10
    *   val stream1 = Observable(1, 2, 3, 4).fold
    *
    *   // Yields "1234"
    *   val stream2 = Observable("1", "2", "3", "4").fold
    * }}}
    *
    * Note, in case you don't have a `Monoid` instance in scope,
    * but you feel like you should, try this import:
    *
    * {{{
    *   import cats.instances.all._
    * }}}
    *
    * @see [[Observable.foldL foldL]] for the version that returns a
    *      task instead of an observable.
    *
    * @param A is the `cats.Monoid` type class instance that's needed
    *          in scope for folding the source
    *
    * @return the result of combining all elements of the source,
    *         or the defined `Monoid.empty` element in case the
    *         stream is empty
    */
  final def fold[AA >: A](implicit A: Monoid[AA]): Observable[AA] =
    foldLeft(A.empty)(A.combine)

  /** Given evidence that type `A` has a `cats.Monoid` implementation,
    * folds the stream with the provided monoid definition.
    *
    * For streams emitting numbers, this effectively sums them up.
    * For strings, this concatenates them.
    *
    * Example:
    *
    * {{{
    *   import cats.implicits._
    *
    *   // Yields 10
    *   val stream1 = Observable(1, 2, 3, 4).foldL
    *
    *   // Yields "1234"
    *   val stream2 = Observable("1", "2", "3", "4").foldL
    * }}}
    *
    * @see [[fold]] for the version that returns an observable
    *      instead of a task.
    *
    * @param A is the `cats.Monoid` type class instance that's needed
    *          in scope for folding the source
    *
    * @return the result of combining all elements of the source,
    *         or the defined `Monoid.empty` element in case the
    *         stream is empty
    */
  final def foldL[AA >: A](implicit A: Monoid[AA]): Task[AA] =
    fold(A).headL

  /** Folds the source observable, from start to finish, until the
    * source completes, or until the operator short-circuits the
    * process by returning `false`.
    *
    * Note that a call to [[foldLeft]] is equivalent to this function
    * being called with an operator always returning `true` as the first
    * member of its result.
    *
    * Example: {{{
    *   // Sums first 10 items
    *   val stream1 = Observable.range(0, 1000).foldWhileLeft((0L, 0)) {
    *     case ((sum, count), e) =>
    *       val next = (sum + e, count + 1)
    *       if (count + 1 < 10) Left(next) else Right(next)
    *   }
    *
    *   // Implements exists(predicate)
    *   val stream2 = Observable(1, 2, 3, 4, 5).foldWhileLeft(false) {
    *     (default, e) =>
    *       if (e == 3) Right(true) else Left(default)
    *   }
    *
    *   // Implements forall(predicate)
    *   val stream3 = Observable(1, 2, 3, 4, 5).foldWhileLeft(true) {
    *     (default, e) =>
    *       if (e != 3) Right(false) else Left(default)
    *   }
    * }}}
    *
    * @see [[Observable.foldWhileLeftL foldWhileLeftL]] for a version
    *      that returns a task instead of an observable.
    *
    * @param seed is the initial state, specified as a possibly lazy value;
    *        it gets evaluated when the subscription happens and if it
    *        triggers an error then the subscriber will get immediately
    *        terminated with an error
    *
    * @param op is the binary operator returning either `Left`,
    *        signaling that the state should be evolved or a `Right`,
    *        signaling that the process can be short-circuited and
    *        the result returned immediately
    *
    * @return the result of inserting `op` between consecutive
    *         elements of this observable, going from left to right with
    *         the `seed` as the start value, or `seed` if the observable
    *         is empty
    */
  final def foldWhileLeft[S](seed: => S)(op: (S, A) => Either[S, S]): Observable[S] =
    new FoldWhileLeftObservable[A, S](self, () => seed, op)

  /** Folds the source observable, from start to finish, until the
    * source completes, or until the operator short-circuits the
    * process by returning `false`.
    *
    * Note that a call to [[foldLeftL]] is equivalent to this function
    * being called with an operator always returning `Left` results.
    *
    * Example: {{{
    *   // Sums first 10 items
    *   val stream1 = Observable.range(0, 1000).foldWhileLeftL((0L, 0)) {
    *     case ((sum, count), e) =>
    *       val next = (sum + e, count + 1)
    *       if (count + 1 < 10) Left(next) else Right(next)
    *   }
    *
    *   // Implements exists(predicate)
    *   val stream2 = Observable(1, 2, 3, 4, 5).foldWhileLeftL(false) {
    *     (default, e) =>
    *       if (e == 3) Right(true) else Left(default)
    *   }
    *
    *   // Implements forall(predicate)
    *   val stream3 = Observable(1, 2, 3, 4, 5).foldWhileLeftL(true) {
    *     (default, e) =>
    *       if (e != 3) Right(false) else Left(default)
    *   }
    * }}}
    *
    * @see [[foldWhileLeft]] for a version that returns an observable
    *      instead of a task.
    *
    * @param seed is the initial state, specified as a possibly lazy value;
    *        it gets evaluated when the subscription happens and if it
    *        triggers an error then the subscriber will get immediately
    *        terminated with an error
    *
    * @param op is the binary operator returning either `Left`,
    *        signaling that the state should be evolved or a `Right`,
    *        signaling that the process can be short-circuited and
    *        the result returned immediately
    *
    * @return the result of inserting `op` between consecutive
    *         elements of this observable, going from left to right with
    *         the `seed` as the start value, or `seed` if the observable
    *         is empty
    */
  final def foldWhileLeftL[S](seed: => S)(op: (S, A) => Either[S, S]): Task[S] =
    foldWhileLeft(seed)(op).headL

  /** Alias for [[firstL]]. */
  final def headL: Task[A] = firstL

  /** Creates a new [[monix.eval.Task Task]] that upon execution
    * will signal the first generated element of the source observable.
    *
    * In case the stream was empty, then the `Task` gets completed
    * in error with a `NoSuchElementException`.
    */
  final def firstL: Task[A] =
    firstOrElseL(throw new NoSuchElementException("firstL on empty observable"))

  /** Creates a new [[monix.eval.Task Task]] that upon execution
    * will signal the first generated element of the source observable.
    *
    * In case the stream was empty, then the given default
    * gets evaluated and emitted.
    */
  final def firstOrElseL[B >: A](default: => B): Task[B] =
    Task.create { (s, cb) =>
      unsafeSubscribeFn(new Subscriber.Sync[A] {
        implicit val scheduler: Scheduler = s
        private[this] var isDone = false

        def onNext(elem: A): Ack = {
          cb.onSuccess(elem)
          isDone = true
          Stop
        }

        def onError(ex: Throwable): Unit =
          if (!isDone) {
            isDone = true
            cb.onError(ex)
          }

        def onComplete(): Unit =
          if (!isDone) {
            isDone = true
            cb(Try(default))
          }
      })
    }

  /** Returns a `Task` that emits a single boolean, either true, in
    * case the given predicate holds for all the items emitted by the
    * source, or false in case at least one item is not verifying the
    * given predicate.
    *
    * @param p is a function that evaluates the items emitted by the source
    *        observable, returning `true` if they pass the filter
    * @return a task that emits only true or false in case the given
    *         predicate holds or not for all the items
    */
  final def forallL(p: A => Boolean): Task[Boolean] =
    existsL(e => !p(e)).map(r => !r)

  /** Returns a `Task` which emits either `true`, in case the given predicate
    * holds for at least one item, or `false` otherwise.
    *
    * @param p is a function that evaluates the items emitted by the
    *        source, returning `true` if they pass the filter
    * @return a task that emits `true` or `false` in case
    *         the given predicate holds or not for at least one item
    */
  final def existsL(p: A => Boolean): Task[Boolean] =
    find(p).foldLeftL(false)((_, _) => true)

  /** Only emits those items for which the given predicate holds.
    *
    * @see [[filterEval]] for a version that works with a [[monix.eval.Task]].
    * @see [[filterEvalF]] for a version that works with a generic
    *      `F[_]` (e.g. `cats.effect.IO`, Scala's `Future`),
    *      powered by [[monix.eval.TaskLike]]
    *
    * @param p a function that evaluates the items emitted by the source
    *        returning `true` if they pass the filter
    * @return a new observable that emits only those items in the source
    *         for which the filter evaluates as `true`
    */
  final def filter(p: A => Boolean): Observable[A] =
    self.liftByOperator(new FilterOperator(p))

  /** Alias to [[filter]] to support syntax in for comprehension, i.e.
    *
    * Example: {{{
    *   case class Person(age: Long)
    *
    *   val peopleObservable: Observable[Person] =
    *     Observable.range(1, 100).map(Person.apply)
    *
    *   for {
    *     adult <- peopleObservable if adult.age >= 18
    *   } yield adult
    * }}}
    */
  final def withFilter(p: A => Boolean): Observable[A] =
    filter(p)

  /** Only emits those items for which the given predicate doesn't hold.
    *
    * @param p a function that evaluates the items emitted by the source
    *        returning `true` if they should be filtered out
    * @return a new observable that emits only those items in the source
    *         for which the filter evaluates as `false`
    */
  final def filterNot(p: A => Boolean): Observable[A] =
    filter(p.andThen(!_))

  /** Version of [[filter]] that can work with a predicate expressed by
    * a [[monix.eval.Task]].
    *
    * @see [[filterEvalF]] for a version that works with a generic
    *      `F[_]` (e.g. `cats.effect.IO`, Scala's `Future`),
    *      powered by [[monix.eval.TaskLike]]
    */
  final def filterEval(p: A => Task[Boolean]): Observable[A] =
    self
      .mapEval(a => p(a).map((a, _)))
      .collect { case x if x._2 => x._1 }

  /** Version of [[filterEval]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def filterEvalF[F[_]](p: A => F[Boolean])(implicit F: TaskLike[F]): Observable[A] =
    filterEval(a => Task.from(p(a))(F))

  /** Only emits the first element emitted by the source observable,
    * after which it's completed immediately.
    */
  final def head: Observable[A] = take(1L)

  /** Selects the first `n` elements (from the start).
    *
    * @param  n the number of elements to take
    * @return a new Observable that emits only the first
    *         `n` elements from the source
    */
  final def take(n: Long): Observable[A] =
    if (n <= 0) Observable.empty else self.liftByOperator(new TakeLeftOperator(n))

  /** Applies a binary operator to a start value and all elements of
    * this Observable, going left to right and returns a new
    * Observable that emits only one item before `onComplete`.
    *
    * @param seed is the initial state, specified as a possibly lazy value;
    *        it gets evaluated when the subscription happens and if it triggers
    *        an error then the subscriber will get immediately terminated
    *        with an error
    *
    * @param op is an operator that will fold the signals of the source
    *        observable, returning the next state
    */
  final def foldLeft[R](seed: => R)(op: (R, A) => R): Observable[R] =
    new FoldLeftObservable[A, R](self, () => seed, op)

  /** Applies a binary operator to a start value and all elements of
    * the source, going left to right and returns a new `Task` that
    * upon evaluation will eventually emit the final result.
    */
  final def foldLeftL[R](seed: => R)(op: (R, A) => R): Task[R] =
    foldLeft(seed)(op).headL

  /** Alias for [[firstOrElseL]]. */
  final def headOrElseL[B >: A](default: => B): Task[B] = firstOrElseL(default)

  /** Returns a [[monix.eval.Task Task]] that upon execution
    * will signal the last generated element of the source observable.
    *
    * In case the stream was empty, then the `Task` gets completed
    * in error with a `NoSuchElementException`.
    */
  final def lastL: Task[A] =
    lastOrElseL(throw new NoSuchElementException("lastL"))

  /** Returns a task that emits `true` if the source observable is
    * empty, otherwise `false`.
    */
  final def isEmptyL: Task[Boolean] =
    isEmpty.headL

  /** Returns an Observable that emits true if the source Observable is
    * empty, otherwise false.
    */
  final def isEmpty: Observable[Boolean] =
    self.liftByOperator(IsEmptyOperator)

  /** Creates a new [[monix.eval.Task Task]] that will consume the
    * source observable and upon completion of the source it will
    * complete with `Unit`.
    */
  final def completedL: Task[Unit] =
    Task.create { (s, cb) =>
      unsafeSubscribeFn(new Subscriber.Sync[A] {
        implicit val scheduler: Scheduler = s
        private[this] var isDone = false

        def onNext(elem: A): Ack = Continue

        def onError(ex: Throwable): Unit =
          if (!isDone) {
            isDone = true; cb.onError(ex)
          }

        def onComplete(): Unit =
          if (!isDone) {
            isDone = true; cb.onSuccess(())
          }
      })
    }

  /** Polymorphic version of [[completedL]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLift]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  final def completedF[F[_]](implicit F: TaskLift[F]): F[Unit] = completedL.to[F]

  /** Given a [[cats.Order]] over the stream's elements, returns the
    * maximum element in the stream.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   // Yields Some(20)
    *   val stream1 = Observable(10, 7, 6, 8, 20, 3, 5).maxL
    *
    *   // Yields Observable.empty
    *   val stream2 = Observable.empty[Int].maxL
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.max maxF]] for the version that returns an
    *      observable instead of a `Task`.
    *
    * @param A is the [[cats.Order]] type class instance that's
    *          going to be used for comparing elements
    *
    * @return the maximum element of the source stream, relative
    *         to the defined `Order`
    */
  final def maxL[AA >: A](implicit A: Order[AA]): Task[Option[AA]] =
    max(A).headOptionL

  /** Given a [[cats.Order]] over the stream's elements, returns the
    * maximum element in the stream.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   // Yields Observable(20)
    *   val stream1 = Observable(10, 7, 6, 8, 20, 3, 5).max
    *
    *   // Yields Observable.empty
    *   val stream2 = Observable.empty[Int].max
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.maxL maxL]] for the version that returns a
    *      [[monix.eval.Task Task]] instead of an observable.
    *
    * @param A is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the maximum element of the source stream, relative
    *         to the defined `Order`
    */
  final def max[AA >: A](implicit A: Order[AA]): Observable[AA] =
    self.liftByOperator(new MaxOperator[AA]()(A))

  /** Alias for [[firstOptionL]]. */
  final def headOptionL: Task[Option[A]] = firstOptionL

  /** Creates a new [[monix.eval.Task Task]] that upon execution
    * will signal the first generated element of the source observable.
    *
    * Returns an `Option` because the source can be empty.
    */
  final def firstOptionL: Task[Option[A]] =
    map(Some.apply).firstOrElseL(None)

  /** Takes the elements of the source observable and emits the
    * element that has the maximum key value, where the key is
    * generated by the given function.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   case class Person(name: String, age: Int)
    *
    *   // Yields Some(Person("Alex", 34))
    *   Observable(Person("Alex", 34), Person("Alice", 27))
    *     .maxByL(_.age)
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.maxBy maxBy]] for the version that returns an
    *      observable instead of a `Task`.
    *
    * @param key is the function that returns the key for which the
    *        given ordering is defined
    *
    * @param K is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the maximum element of the source stream, relative
    *         to its key generated by the given function and the
    *         given ordering
    */
  final def maxByL[K](key: A => K)(implicit K: Order[K]): Task[Option[A]] =
    maxBy(key)(K).headOptionL

  /** Takes the elements of the source observable and emits the
    * element that has the maximum key value, where the key is
    * generated by the given function.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   case class Person(name: String, age: Int)
    *
    *   // Yields Observable(Person("Alex", 34))
    *   Observable(Person("Alex", 34), Person("Alice", 27))
    *     .maxBy(_.age)
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.maxByL maxByL]] for the version that returns a
    *      [[monix.eval.Task Task]] instead of an observable.
    *
    * @param key is the function that returns the key for which the
    *        given ordering is defined
    *
    * @param K is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the maximum element of the source stream, relative
    *         to its key generated by the given function and the
    *         given ordering
    */
  final def maxBy[K](key: A => K)(implicit K: Order[K]): Observable[A] =
    self.liftByOperator(new MaxByOperator[A, K](key)(K))

  /** Given a [[cats.Order]] over the stream's elements, returns the
    * minimum element in the stream.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   // Yields Some(3)
    *   val stream1 =
    *     Observable(10, 7, 6, 8, 20, 3, 5).minL
    *
    *   // Yields None
    *   val stream2 =
    *     Observable.empty[Int].minL
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.min minF]] for the version that returns an
    *      observable instead of a `Task`.
    *
    * @param A is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the minimum element of the source stream, relative
    *         to the defined `Order`
    */
  final def minL[AA >: A](implicit A: Order[AA]): Task[Option[AA]] =
    min(A).headOptionL

  /** Given a [[cats.Order]] over the stream's elements, returns the
    * minimum element in the stream.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   // Yields Observable(3)
    *   val stream1 =
    *     Observable(10, 7, 6, 8, 20, 3, 5).min
    *
    *   // Yields Observable.empty
    *   val stream2 =
    *     Observable.empty[Int].min
    * }}}
    *
    * $catsOrderInterop
    *
    * @see [[Observable.minL minL]] for the version that returns a
    *      [[monix.eval.Task Task]] instead of an observable.
    *
    * @param A is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the minimum element of the source stream, relative
    *         to the defined `Order`
    */
  final def min[AA >: A](implicit A: Order[AA]): Observable[AA] =
    self.liftByOperator(new MinOperator()(A))

  /** Takes the elements of the source observable and emits the
    * element that has the minimum key value, where the key is
    * generated by the given function.
    *
    * ==Example==
    *
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   case class Person(name: String, age: Int)
    *
    *   // Yields Some(Person("Alice", 27))
    *   Observable(Person("Alex", 34), Person("Alice", 27))
    *     .minByL(_.age)
    * }}}
    *
    * $catsOrderInterop
    *
    * @param key is the function that returns the key for which the
    *        given ordering is defined
    *
    * @param K is the [[cats.Order]] type class instance that's going
    *          to be used for comparing elements
    *
    * @return the minimum element of the source stream, relative
    *         to its key generated by the given function and the
    *         given ordering
    */
  final def minByL[K](key: A => K)(implicit K: Order[K]): Task[Option[A]] =
    minBy(key)(K).headOptionL

  /** Takes the elements of the source observable and emits the
    * element that has the minimum key value, where the key is
    * generated by the given function.
    *
    * Example:
    * {{{
    *   // Needed to bring the standard Order instances in scope:
    *   import cats.implicits._
    *
    *   case class Person(name: String, age: Int)
    *
    *   // Yields Observable(Person("Alice", 27))
    *   val stream = Observable(Person("Alex", 34), Person("Alice", 27))
    *     .minBy(_.age)
    * }}}
    *
    * $catsOrderInterop
    *
    * @param key is the function that returns the key for which the
    *        given ordering is defined
    *
    * @param K is the [[cats.Order]] type class instance that's
    *          going to be used for comparing elements
    *
    * @return the minimum element of the source stream, relative
    *         to its key generated by the given function and the
    *         given ordering
    */
  final def minBy[K](key: A => K)(implicit K: Order[K]): Observable[A] =
    self.liftByOperator(new MinByOperator[A, K](key))

  /** Returns a task that emits `false` if the source observable is
    * empty, otherwise `true`.
    */
  final def nonEmptyL: Task[Boolean] =
    nonEmpty.headL

  /** Returns an Observable that emits false if the source Observable is
    * empty, otherwise true.
    */
  final def nonEmpty: Observable[Boolean] =
    self.liftByOperator(IsEmptyOperator).map(b => !b)

  /** Given a source that emits numeric values, the `sum` operator sums
    * up all values and returns the result.
    */
  final def sumL[B >: A](implicit B: Numeric[B]): Task[B] =
    sum(B).headL

  /** Given a source that emits numeric values, the `sum` operator sums
    * up all values and at onComplete it emits the total.
    */
  final def sum[AA >: A](implicit A: Numeric[AA]): Observable[AA] =
    foldLeft(A.zero)(A.plus)

  /** Returns a `Task` that upon evaluation will collect all items from
    * the source in a Scala `List` and return this list instead.
    *
    * WARNING: for infinite streams the process will eventually blow up
    * with an out of memory error.
    */
  final def toListL: Task[List[A]] =
    foldLeftL(mutable.ListBuffer.empty[A])(_ += _).map(_.toList)

  /** Makes the source `Observable` uninterruptible such that a `cancel`
    * signal has no effect.
    *
    * ==Example==
    *
    * {{{
    *   import scala.concurrent.duration._
    *
    *   Observable.eval(println("Hello!"))
    *     .delayExecution(10.seconds)
    *     .uncancelable
    * }}}
    *
    * The created observable, after `subscribe`, will print "Hello!"
    * even if cancellation is attempted.
    */
  final def uncancelable: Observable[A] =
    new UncancelableObservable[A](self)

  /** Creates a new [[monix.eval.Task Task]] that will consume the
    * source observable, executing the given callback for each element.
    */
  final def foreachL(cb: A => Unit): Task[Unit] =
    Task.create { (s, onFinish) =>
      unsafeSubscribeFn(new ForeachSubscriber[A](cb, onFinish, s))
    }

  /** Transforms the source using the given transformer function. */
  def transform[B](transformer: Transformer[A, B]): Observable[B] =
    transformer(this)

}

/** Observable builders.
  *
  * @define multicastDesc Creates an input channel and an output observable
  *         pair for building a [[MulticastStrategy multicast]] data-source.
  *
  *         Useful for building [[MulticastStrategy multicast]] observables
  *         from data-sources that cannot be back-pressured.
  *
  *         Prefer [[Observable.create]] when possible.
  *
  * @define fromIteratorDesc Converts any `Iterator` into an observable.
  *
  *         WARNING: reading from an `Iterator` is a destructive process.
  *         Therefore only a single subscriber is supported, the result being
  *         a single-subscriber observable. If multiple subscribers are attempted,
  *         all subscribers, except for the first one, will be terminated with a
  *         [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
  *
  *         Therefore, if you need a factory of data sources, from a cold source
  *         from which you can open how many iterators you want,
  *         you can use [[Observable.defer]] to build such a factory. Or you can share
  *         the resulting observable by converting it into a
  *         [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
  *         by means of [[Observable!.multicast multicast]].
  *
  * @define blocksDefaultSchedulerDesc This operation will start processing on the current
  *         thread (on `subscribe()`), so in order to not block, it might be better to also do an
  *         [[Observable.executeAsync executeAsync]], or you may want to use the
  *         [[monix.execution.ExecutionModel.AlwaysAsyncExecution AlwaysAsyncExecution]]
  *         model, which can be configured per `Scheduler`, see
  *         [[monix.execution.Scheduler.withExecutionModel Scheduler.withExecutionModel]],
  *         or per `Observable`, see [[Observable.executeWithModel]].
  */
object Observable extends ObservableDeprecatedBuilders {
  /** An `Operator` is a function for transforming observers,
    * that can be used for lifting observables.
    *
    * See [[Observable.liftByOperator]].
    */
  type Operator[-I, +O] = Subscriber[O] => Subscriber[I]

  /** A `Transformer` is a function used for transforming observables.
    *
    * See [[Observable.transform]]
    */
  type Transformer[-A, +B] = Observable[A] => Observable[B]

  /** Given a sequence of elements, builds an observable from it. */
  def apply[A](elems: A*): Observable[A] =
    Observable.fromIterable(elems)

  /** Lifts an element into the `Observable` context.
    *
    * Alias for [[now]].
    */
  def pure[A](elem: A): Observable[A] =
    new builders.NowObservable(elem)

  /** Alias for [[eval]]. */
  def delay[A](a: => A): Observable[A] = eval(a)

  /** Given a non-strict value, converts it into an Observable
    * that emits a single element and that memoizes the value
    * for subsequent invocations.
    */
  def evalOnce[A](f: => A): Observable[A] =
    new builders.EvalOnceObservable(f)

  /** Returns an `Observable` that on execution emits the given strict value.
    */
  def now[A](elem: A): Observable[A] =
    new builders.NowObservable(elem)

  /** Creates an Observable that emits an error.
    */
  def raiseError[A](ex: Throwable): Observable[A] =
    new builders.ErrorObservable(ex)

  /** Given a non-strict value, converts it into an Observable
    * that upon subscription, evaluates the expression and
    * emits a single element.
    */
  def eval[A](a: => A): Observable[A] =
    new builders.EvalAlwaysObservable(() => a)

  /** Lifts a non-strict value into an observable that emits a single element,
    * but upon subscription delay its evaluation by the specified timespan
    */
  def evalDelayed[A](delay: FiniteDuration, a: => A): Observable[A] =
    eval(a).delayExecution(delay)

  /** Creates an Observable that doesn't emit anything and that never
    * completes.
    */
  def never[A]: Observable[A] =
    builders.NeverObservable

  /** Reusable value for an `Observable[Unit]` that emits a single
    * event, the implementation for `cats.effect.Applicative.unit`.
    */
  val unit: Observable[Unit] =
    Observable.now(())

  /** Keeps calling `f` and concatenating the resulting observables
    * for each `scala.util.Left` event emitted by the source, concatenating
    * the resulting observables and pushing every `scala.util.Right[B]`
    * events downstream.
    *
    * Based on Phil Freeman's
    * [[http://functorial.com/stack-safety-for-free/index.pdf Stack Safety for Free]].
    *
    * It helps to wrap your head around it if you think of it as being
    * equivalent to this inefficient and unsafe implementation (for `Observable`):
    *
    * {{{
    *   // Don't do this kind of recursion, because `flatMap` can throw
    *   // stack overflow errors:
    *   def tailRecM[A, B](a: A)(f: (A) => Observable[Either[A, B]]): Observable[B] =
    *     f(a).flatMap {
    *       case Right(b) => Observable.pure(b)
    *       case Left(nextA) => tailRecM(nextA)(f)
    *    }
    * }}}
    */
  def tailRecM[A, B](a: A)(f: A => Observable[Either[A, B]]): Observable[B] =
    new builders.TailRecMObservable[A, B](a, f)

  /** Given a subscribe function, lifts it into an [[Observable]].
    *
    * This function is unsafe to use because users have to know and apply
    * the Monix communication contract, related to thread-safety, communicating
    * demand (back-pressure) and error handling.
    *
    * Only use if you know what you're doing. Otherwise prefer [[create]].
    */
  def unsafeCreate[A](f: Subscriber[A] => Cancelable): Observable[A] =
    new builders.UnsafeCreateObservable(f)

  /** Creates an observable from a function that receives a
    * concurrent and safe
    * [[monix.reactive.observers.Subscriber.Sync Subscriber.Sync]].
    *
    * This builder represents the safe way of building observables
    * from data-sources that cannot be back-pressured.
    *
    * @param overflowStrategy is the [[OverflowStrategy overflow strategy]]
    *        that specifies the type of the underlying buffer (unbounded,
    *        that overflows the head, etc). This parameter can only specify
    *        a "synchronous" strategy, so no back-pressuring allowed.
    *
    * @param producerType (UNSAFE) is the
    *        [[monix.execution.ChannelType.ProducerSide producer type]]
    *        and can be `MultiProducer` or `SingleProducer`, specified as an
    *        optimization option; if you don't know what you're doing, stick to
    *        `MultiProducer`, which says that multiple producers can push
    *        events at the same time, which is the default
    */
  def create[A](
    overflowStrategy: OverflowStrategy.Synchronous[A],
    producerType: ChannelType.ProducerSide = MultiProducer
  )(f: Subscriber.Sync[A] => Cancelable): Observable[A] =
    new builders.CreateObservable(overflowStrategy, producerType, f)

  /** $multicastDesc
    *
    * @param multicast is the multicast strategy to use (e.g. publish, behavior,
    *        reply, async)
    */
  def multicast[A](multicast: MulticastStrategy[A])(implicit s: Scheduler): (Observer.Sync[A], Observable[A]) = {

    val ref = ConcurrentSubject(multicast)
    (ref, ref)
  }

  /** $multicastDesc
    *
    * @param multicast is the multicast strategy to use (e.g. publish, behavior,
    *        reply, async)
    * @param overflow is the overflow strategy for the buffer that gets placed
    *        in front (since this will be a hot data-source that cannot be
    *        back-pressured)
    */
  def multicast[A](multicast: MulticastStrategy[A], overflow: OverflowStrategy.Synchronous[A])(implicit
    s: Scheduler
  ): (Observer.Sync[A], Observable[A]) = {

    val ref = ConcurrentSubject(multicast, overflow)
    (ref, ref)
  }

  /** Converts to [[Observable]] from any `F[_]` that has an [[ObservableLike]]
    * instance.
    *
    * Supported types includes, but is not necessarily limited to:
    *
    *  - [[cats.Eval]]
    *  - [[https://typelevel.org/cats-effect/datatypes/io.html cats.effect.IO]]
    *  - [[https://typelevel.org/cats-effect/datatypes/syncio.html cats.effect.SyncIO]]
    *  - [[https://typelevel.org/cats-effect/typeclasses/effect.html cats.effect.Effect (Async)]]
    *  - [[https://typelevel.org/cats-effect/typeclasses/concurrent-effect.html cats.effect.ConcurrentEffect]]
    *  - [[https://www.reactive-streams.org/ org.reactivestreams.Publisher]]
    *  - [[monix.eval.Coeval]]
    *  - [[monix.eval.Task]]
    *  - [[scala.Either]]
    *  - [[scala.util.Try]]
    *  - [[scala.concurrent.Future]]
    */
  def from[F[_], A](fa: F[A])(implicit F: ObservableLike[F]): Observable[A] =
    F.apply(fa)

  /** Converts any `Iterable` into an [[Observable]].
    */
  def fromIterable[A](iterable: Iterable[A]): Observable[A] =
    new builders.IterableAsObservable[A](iterable)

  /** Wraps a [[scala.Iterator]] into an `Observable`.
    *
    * This function uses [[monix.eval.Task Task]] in order to suspend
    * the creation of the `Iterator`, because reading from an `Iterator`
    * is a destructive process. The `Task` is being used as a "factory",
    * in pace of [[scala.Iterable]].
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *
    *   Observable.fromIterator(Task(Iterator.from(1)))
    * }}}
    *
    * @see [[fromIterable]]
    *
    * @see [[fromIterator[A](resource* fromIterator(Resource)]] for a version
    *      that uses `cats.effect.Resource`
    *
    * @see [[fromIteratorUnsafe]] for the unsafe version that can wrap an
    *      iterator directly
    */
  def fromIterator[A](task: Task[Iterator[A]]): Observable[A] =
    Observable.fromTask(task.map(fromIteratorUnsafe)).flatten

  /** Wraps a [[scala.Iterator]] into an `Observable` in the context of a
    * [[https://typelevel.org/cats-effect/datatypes/resource.html cats.effect.Resource]],
    * which allows for specifying a finalizer.
    *
    * @see [[fromIterable]]
    *
    * @see [[fromIterator[A](task* fromIterator(task)]] for a version
    *      that uses [[monix.eval.Task Task]] for suspending side effects
    *
    * @see [[fromIteratorUnsafe]] for the unsafe version that can wrap an
    *      iterator directly
    */
  def fromIterator[A](resource: Resource[Task, Iterator[A]]): Observable[A] =
    Observable.fromResource(resource).flatMap(fromIteratorUnsafe)

  /** Version of fromIterator that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def fromIteratorF[F[_], A](iteratorF: F[Iterator[A]])(implicit F: TaskLike[F]): Observable[A] =
    fromIterator(F(iteratorF))

  /** Converts any `Iterator` into an observable.
    *
    * '''UNSAFE WARNING''': reading from an `Iterator` is a destructive process.
    * Therefore only a single subscriber is supported, the result being
    * a single-subscriber observable. If multiple subscribers are attempted,
    * all subscribers, except for the first one, will be terminated with a
    * [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
    *
    * @see [[fromIterator[A](task* fromIterator(task)]] or
    *      [[fromIterator[A](resource* fromIterator(resource)]]
    *      for safe alternatives
    *
    * @param iterator to transform into an observable
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def fromIteratorUnsafe[A](iterator: Iterator[A]): Observable[A] =
    new builders.IteratorAsObservable[A](iterator)

  /** Wraps a [[scala.Iterator]] into an `Observable` that emits events in `chunkSize` batches.
    *
    * This function uses [[monix.eval.Task Task]] in order to suspend
    * the creation of the `Iterator`, because reading from an `Iterator`
    * is a destructive process. The `Task` is being used as a "factory",
    * in pace of [[scala.Iterable]].
    *
    * Example:
    * {{{
    *   import monix.eval.Task
    *
    *   Observable.fromIteratorBuffered(Task(Iterator.from(1)), 2)
    * }}}
    *
    * @see [[fromIterable]]
    *
    * @see [[fromIteratorBuffered[A](resource* fromIteratorBuffered(Resource)]] for a version
    *      that uses `cats.effect.Resource`
    *
    * @see [[fromIteratorBufferedUnsafe]] for the unsafe version that can wrap an
    *      iterator directly
    */
  def fromIteratorBuffered[A](task: Task[Iterator[A]], bufferSize: Int): Observable[Seq[A]] =
    Observable.fromTask(task.map(fromIteratorBufferedUnsafe(_, bufferSize))).flatten

  /** Wraps a [[scala.Iterator]] into an `Observable` in the context of a
    * [[https://typelevel.org/cats-effect/datatypes/resource.html cats.effect.Resource]],
    * which allows for specifying a finalizer.
    *
    * @see [[fromIterable]]
    *
    * @see [[fromIteratorBuffered[A](task* fromIteratorBuffered(task)]] for a version
    *      that uses [[monix.eval.Task Task]] for suspending side effects
    *
    * @see [[fromIteratorBufferedUnsafe]] for the unsafe version that can wrap an
    *      iterator directly
    */
  def fromIteratorBuffered[A](resource: Resource[Task, Iterator[A]], bufferSize: Int): Observable[Seq[A]] =
    Observable.fromResource(resource).flatMap(fromIteratorBufferedUnsafe(_, bufferSize))

  /** Converts any `Iterator` into an observable that emits events in `bufferSize` batches.
    *
    * '''UNSAFE WARNING''': reading from an `Iterator` is a destructive process.
    * Therefore only a single subscriber is supported, the result being
    * a single-subscriber observable. If multiple subscribers are attempted,
    * all subscribers, except for the first one, will be terminated with a
    * [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
    *
    * @see [[fromIteratorBuffered[A](task* fromIteratorBuffered(task)]] or
    *      [[fromIteratorBuffered[A](resource* fromIteratorBuffered(resource)]]
    *      for safe alternatives
    *
    * @param iterator to transform into an observable
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def fromIteratorBufferedUnsafe[A](iterator: Iterator[A], bufferSize: Int): Observable[Seq[A]] =
    new builders.BufferedIteratorAsObservable[A](iterator, bufferSize)

  /** Transforms any `cats.effect.Resource` into an [[Observable]].
    *
    * See the
    * [[https://typelevel.org/cats-effect/datatypes/resource.html documentation for Resource]].
    *
    * {{{
    *   import cats.effect.Resource
    *   import monix.eval.Task
    *   import java.io._
    *
    *   def openFileAsResource(file: File): Resource[Task, FileInputStream] =
    *     Resource.make(Task(new FileInputStream(file)))(h => Task(h.close()))
    *
    *   def openFileAsStream(file: File): Observable[FileInputStream] =
    *     Observable.fromResource(openFileAsResource(file))
    * }}}
    *
    * This example would be equivalent with usage of [[Observable.resource]]:
    *
    * {{{
    *   def openFileAsResource2(file: File): Observable[FileInputStream] = {
    *     Observable.resource(Task(new FileInputStream(file)))(h => Task(h.close()))
    *   }
    * }}}
    *
    * This means that `flatMap` is safe to use:
    *
    * {{{
    *   def readBytes(file: File): Observable[Array[Byte]] =
    *     openFileAsStream(file).flatMap { in =>
    *       Observable.fromInputStreamUnsafe(in)
    *     }
    * }}}
    */
  def fromResource[F[_], A](resource: Resource[F, A])(implicit F: TaskLike[F]): Observable[A] =
    resource match {
      case ra: Resource.Allocate[F, A] @unchecked =>
        Observable
          .resourceCase(F(ra.resource)) { case ((_, release), exitCase) => F(release(exitCase)) }
          .map(_._1)
      case ra: Resource.Suspend[F, A] @unchecked =>
        Observable.from(ra.resource).flatMap { res =>
          fromResource(res)
        }
      case ra: Resource.Bind[F, Any, A] @unchecked =>
        fromResource(ra.source).flatMap { s =>
          fromResource(ra.fs(s))
        }
    }

  /** Safely converts a `java.io.InputStream` into an observable that will
    * emit `Array[Byte]` elements.
    *
    * Compared with [[fromInputStreamUnsafe]], this version:
    *
    *  - is referentially transparent, the input being a "generator"
    *    powered by [[monix.eval.Task]]
    *  - automatically forks execution on subscription to ensure that
    *    the current thread isn't blocked by the ensuing blocking I/O
    *  - ensures that the input stream is closed on completion,
    *    failure or cancellation
    *
    * @param in the `Task[InputStream]` generator to convert into an observable
    * @param chunkSize the maximum length of the emitted arrays of bytes, must be positive
    */
  def fromInputStream(in: Task[InputStream], chunkSize: Int = 4096): Observable[Array[Byte]] = {
    Observable
      .resource(in)(h => Task(h.close()))
      .flatMap(fromInputStreamUnsafe(_, chunkSize))
      .executeAsync
  }

  /** Version of [[fromInputStream]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def fromInputStreamF[F[_]](in: F[InputStream], chunkSize: Int = 4096)(implicit
    F: TaskLike[F]
  ): Observable[Array[Byte]] =
    fromInputStream(F(in), chunkSize)

  /** Converts a `java.io.InputStream` into an observable that will
    * emit `Array[Byte]` elements.
    *
    * '''UNSAFE WARNING''': this is an unsafe function, because reading from
    * an input stream is a destructive process, also violating
    * referential transparency. Therefore only a single subscriber is
    * supported, the result being a single-subscriber observable. If
    * multiple subscribers are attempted, all subscribers, except for
    * the first one, will be terminated with a
    * [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
    *
    * '''UNSAFE PROTOCOL''': the created Observable does not close the given
    * `InputStream`. Usually it's the producer of a resource that needs
    * to deallocate the resource.
    *
    * $blocksDefaultSchedulerDesc
    *
    * @see [[fromInputStream]] for the safe version
    * @param in the `InputStream` to convert into an observable
    * @param chunkSize the maximum length of the emitted arrays of bytes, must be positive
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def fromInputStreamUnsafe(in: InputStream, chunkSize: Int = 4096): Observable[Array[Byte]] =
    new builders.InputStreamObservable(in, chunkSize)

  /** Safely converts a `java.io.Reader` into an observable that will
    * emit `Array[Char]` elements.
    *
    * Compared with [[fromCharsReaderUnsafe]], this version:
    *
    *  - is referentially transparent, the input being a "generator"
    *    powered by [[monix.eval.Task]]
    *  - automatically forks execution on subscription to ensure that
    *    the current thread isn't blocked by the ensuing blocking I/O
    *  - ensures that the input stream is closed on completion,
    *    failure or cancellation
    *
    * @param in the `Task[Reader]` generator to convert into an observable
    * @param chunkSize the maximum length of the emitted arrays of chars, must be positive
    */
  def fromCharsReader(in: Task[Reader], chunkSize: Int = 4096): Observable[Array[Char]] = {
    Observable
      .resource(in)(h => Task(h.close()))
      .flatMap(fromCharsReaderUnsafe(_, chunkSize))
      .executeAsync
  }

  /** Version of [[fromCharsReader]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def fromCharsReaderF[F[_]](in: F[Reader], chunkSize: Int = 4096)(implicit F: TaskLike[F]): Observable[Array[Char]] =
    fromCharsReader(F(in), chunkSize)

  /** Converts a `java.io.Reader` into an observable that will emit
    * `Array[Char]` elements.
    *
    * '''UNSAFE WARNING''': this is an unsafe function, because reading from
    * a reader is a destructive process, also violating referential
    * transparency. Therefore only a single subscriber is supported,
    * the result being a single-subscriber observable. If multiple
    * subscribers are attempted, all subscribers, except for the first
    * one, will be terminated with a
    * [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
    *
    * '''UNSAFE PROTOCOL''': the created Observable does not close the given
    * `Reader`. Usually it's the producer of a resource that needs
    * to deallocate the resource.
    *
    * $blocksDefaultSchedulerDesc
    *
    * @see [[fromCharsReader]] for the safe version
    *
    * @param in the `Reader` to convert into an observable
    * @param chunkSize the maximum length of the emitted arrays of chars, must be positive
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def fromCharsReaderUnsafe(in: Reader, chunkSize: Int = 4096): Observable[Array[Char]] =
    new builders.CharsReaderObservable(in, chunkSize)

  /** Safely converts a `java.io.BufferedReader` into an observable that will
    * emit `String` elements corresponding to text lines from the input.
    *
    * According to the specification of `BufferedReader`, a line is considered
    * to be terminated by any one of a line feed (`\n`), a carriage return (`\r`),
    * or a carriage return followed immediately by a linefeed.
    *
    * Compared with [[fromLinesReaderUnsafe]], this version:
    *
    *  - is referentially transparent, the input being a "generator"
    *    powered by [[monix.eval.Task]]
    *  - automatically forks execution on subscription to ensure that
    *    the current thread isn't blocked by the ensuing blocking I/O
    *  - ensures that the input stream is closed on completion,
    *    failure or cancellation
    *
    * @param in is the `Task[BufferedReader]` generator to convert into an observable
    */
  def fromLinesReader(in: Task[BufferedReader]): Observable[String] = {
    Observable
      .resource(in)(h => Task(h.close()))
      .flatMap(fromLinesReaderUnsafe)
      .executeAsync
  }

  /** Version of [[fromLinesReader]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def fromLinesReaderF[F[_]](in: F[BufferedReader])(implicit F: TaskLike[F]): Observable[String] =
    fromLinesReader(F(in))

  /** Converts a `java.io.BufferedReader` into an observable that will emit
    * `String` text lines from the input.
    *
    * According to the specification of `BufferedReader`, a line is considered
    * to be terminated by any one of a line feed (`\n`), a carriage return (`\r`),
    * or a carriage return followed immediately by a linefeed.
    *
    * '''UNSAFE WARNING''': this is an unsafe function, because reading from
    * a reader is a destructive process, also violating referential
    * transparency. Therefore only a single subscriber is supported,
    * the result being a single-subscriber observable. If multiple
    * subscribers are attempted, all subscribers, except for the first
    * one, will be terminated with a
    * [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
    *
    * '''UNSAFE PROTOCOL''': the created Observable does not close the given
    * `Reader`. Usually it's the producer of a resource that needs
    * to deallocate the resource.
    *
    * @see [[fromLinesReader]] for the safe version
    *
    * @param in is the `Reader` to convert into an observable
    */
  @UnsafeProtocol
  @UnsafeBecauseImpure
  def fromLinesReaderUnsafe(in: BufferedReader): Observable[String] =
    new builders.LinesReaderObservable(in)

  /** Given a `org.reactivestreams.Publisher`, converts it into a
    * Monix / Rx Observable.
    *
    * See the [[http://www.reactive-streams.org/ Reactive Streams]]
    * protocol that Monix implements.
    *
    * @see [[Observable.toReactive]] for converting an `Observable` to
    *      a reactive publisher.
    *
    * @param publisher is the `org.reactivestreams.Publisher` reference to
    *        wrap into an [[Observable]]
    */
  def fromReactivePublisher[A](publisher: RPublisher[A]): Observable[A] =
    new builders.ReactiveObservable[A](publisher, 0)

  /** Given a `org.reactivestreams.Publisher`, converts it into a
    * Monix / Rx Observable.
    *
    * See the [[http://www.reactive-streams.org/ Reactive Streams]]
    * protocol that Monix implements.
    *
    * @see [[Observable.toReactive]] for converting an `Observable` to
    *      a reactive publisher.
    *
    * @param publisher is the `org.reactivestreams.Publisher` reference to
    *        wrap into an [[Observable]]
    *
    * @param requestCount a strictly positive number, representing the size
    *        of the buffer used and the number of elements requested on each
    *        cycle when communicating demand, compliant with the
    *        reactive streams specification. If `Int.MaxValue` is given,
    *        then no back-pressuring logic will be applied (e.g. an unbounded
    *        buffer is used and the source has a license to stream as many
    *        events as it wants).
    */
  def fromReactivePublisher[A](publisher: RPublisher[A], requestCount: Int): Observable[A] =
    new builders.ReactiveObservable[A](publisher, requestCount)

  /** Transforms a non-strict [[monix.eval.Coeval Coeval]] value
    * into an `Observable` that emits a single element.
    */
  def coeval[A](value: Coeval[A]): Observable[A] =
    value match {
      case Coeval.Now(a) => Observable.now(a)
      case Coeval.Error(e) => Observable.raiseError(e)
      case other => Observable.eval(other.value())
    }

  /** Converts a Scala `Try` into an `Observable`.
    *
    * {{{
    *   import scala.util.Try
    *
    *   val value = Try(1)
    *   Observable.fromTry(value)
    * }}}
    */
  def fromTry[A](a: Try[A]): Observable[A] =
    a match {
      case Success(v) => Observable.now(v)
      case Failure(e) => Observable.raiseError(e)
    }

  /** Builds an `Observable` instance out of a Scala `Either`.
    */
  def fromEither[E <: Throwable, A](a: Either[E, A]): Observable[A] =
    a match {
      case Right(v) => Observable.now(v)
      case Left(ex) => Observable.raiseError(ex)
    }

  /** Builds a [[Observable]] instance out of a Scala `Either`.
    */
  def fromEither[E, A](f: E => Throwable)(a: Either[E, A]): Observable[A] =
    a match {
      case Right(v) => Observable.now(v)
      case Left(ex) => Observable.raiseError(f(ex))
    }

  /** Converts a Scala `Future` provided into an [[Observable]].
    *
    * If the created instance is a
    * [[monix.execution.CancelableFuture CancelableFuture]],
    * then it will be used for the returned
    * [[monix.execution.Cancelable Cancelable]] on `subscribe`.
    */
  def fromFuture[A](factory: => Future[A]): Observable[A] =
    new builders.FutureAsObservable(factory)

  /** Converts generic `F[_]` effects to `Observable`.
    *
    * Currently supported data types:
    *
    *  - [[monix.eval.Task]]
    *  - [[monix.eval.Coeval]]
    *  - [[scala.concurrent.Future]]
    *  - [[https://typelevel.org/cats-effect/datatypes/io.html cats.effect.IO]]
    *  - any [[https://typelevel.org/cats-effect/typeclasses/effect.html cats.effect.Effect]]
    *  - any [[https://typelevel.org/cats-effect/typeclasses/concurrent-effect.html cats.effect.ConcurrentEffect]]
    *
    *  Sample:
    *
    *  {{{
    *    import cats.implicits._
    *    import cats.effect.IO
    *    import cats.effect.Timer
    *    import scala.concurrent.duration._
    *    import monix.execution.Scheduler.global
    *    import monix.catnap.SchedulerEffect
    *
    *    // Needed for IO.sleep
    *    implicit val timer: Timer[IO] = SchedulerEffect.timerLiftIO[IO](global)
    *    val task = IO.sleep(5.seconds) *> IO(println("Hello!"))
    *
    *    Observable.fromTaskLike(task)
    *  }}}
    */
  def fromTaskLike[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Observable[A] =
    fromTask(F(fa))

  /** Converts any [[monix.eval.Task Task]] into an [[Observable]].
    *
    * {{{
    *   import monix.eval.Task
    *
    *   val task = Task.eval("Hello!")
    *
    *   Observable.fromTask(task)
    * }}}
    */
  def fromTask[A](task: Task[A]): Observable[A] =
    new builders.TaskAsObservable(task)

  /** Returns a `F ~> Coeval` (`FunctionK`) for transforming any
    * supported data-type into [[Observable]].
    */
  def liftFrom[F[_]](implicit F: ObservableLike[F]): F ~> Observable = F

  /** Alias for [[defer]]. */
  def suspend[A](fa: => Observable[A]): Observable[A] = defer(fa)

  /** Returns a new observable that creates a sequence from the
    * given factory on each subscription.
    */
  def defer[A](fa: => Observable[A]): Observable[A] =
    new builders.DeferObservable(() => fa)

  /** Builds a new observable from a strict `head` and a lazily
    * evaluated tail.
    */
  def cons[A](head: A, tail: Observable[A]): Observable[A] =
    new builders.ConsObservable[A](head, tail)

  /** Creates a new observable from this observable and another given
    * observable by interleaving their items into a strictly alternating sequence.
    *
    * So the first item emitted by the new observable will be the item emitted by
    * `self`, the second item will be emitted by the other observable, and so forth;
    * when either `self` or `other` calls `onCompletes`, the items will then be
    * directly coming from the observable that has not completed; when `onError` is
    * called by either `self` or `other`, the new observable will call `onError` and halt.
    *
    * See [[Observable!.merge merge]] for a more relaxed alternative that doesn't
    * emit items in strict alternating sequence.
    */
  def interleave2[A](oa1: Observable[A], oa2: Observable[A]): Observable[A] =
    new builders.Interleave2Observable(oa1, oa2)

  /** Creates an Observable that emits auto-incremented natural numbers
    * (longs) spaced by a given time interval. Starts from 0 with `initialDelay`,
    * after which it emits incremented numbers spaced by the
    * `delay` of time. The given `delay` of time acts as a fixed
    * delay between successive events.
    *
    * This version of the `intervalWithFixedDelay` allows specifying an
    * `initialDelay` before events start being emitted.
    *
    * @param initialDelay is the delay to wait before emitting the first event
    * @param delay the time to wait between 2 successive events
    */
  def intervalWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration): Observable[Long] =
    new builders.IntervalFixedDelayObservable(initialDelay, delay)

  /** Creates an Observable that emits auto-incremented natural numbers
    * (longs) spaced by a given time interval. Starts from 0 with no
    * delay, after which it emits incremented numbers spaced by the
    * `delay` of time. The given `delay` of time acts as a fixed
    * delay between successive events.
    *
    * @param delay the delay between 2 successive events
    */
  def interval(delay: FiniteDuration): Observable[Long] =
    intervalWithFixedDelay(delay)

  /** Creates an Observable that emits auto-incremented natural numbers
    * (longs) spaced by a given time interval. Starts from 0 with no
    * delay, after which it emits incremented numbers spaced by the
    * `delay` of time. The given `delay` of time acts as a fixed
    * delay between successive events.
    *
    * @param delay the delay between 2 successive events
    */
  def intervalWithFixedDelay(delay: FiniteDuration): Observable[Long] =
    new builders.IntervalFixedDelayObservable(Duration.Zero, delay)

  /** Creates an Observable that emits auto-incremented natural numbers
    * (longs) at a fixed rate, as given by the specified `period`. The
    * time it takes to process an `onNext` event gets subtracted from
    * the specified `period` and thus the created observable tries to
    * emit events spaced by the given time interval, regardless of how
    * long the processing of `onNext` takes.
    *
    * @param period the period between 2 successive `onNext` events
    */
  def intervalAtFixedRate(period: FiniteDuration): Observable[Long] =
    new builders.IntervalFixedRateObservable(Duration.Zero, period)

  /** Creates an Observable that emits auto-incremented natural numbers
    * (longs) at a fixed rate, as given by the specified `period`. The
    * time it takes to process an `onNext` event gets subtracted from
    * the specified `period` and thus the created observable tries to
    * emit events spaced by the given time interval, regardless of how
    * long the processing of `onNext` takes.
    *
    * This version of the `intervalAtFixedRate` allows specifying an
    * `initialDelay` before events start being emitted.
    *
    * @param initialDelay is the initial delay before emitting the first event
    * @param period the period between 2 successive `onNext` events
    */
  def intervalAtFixedRate(initialDelay: FiniteDuration, period: FiniteDuration): Observable[Long] =
    new builders.IntervalFixedRateObservable(initialDelay, period)

  /** Creates an Observable that continuously emits the given ''item'' repeatedly.
    */
  def repeat[A](elems: A*): Observable[A] =
    new builders.RepeatObservable(elems: _*)

  /** Repeats the execution of the given `task`, emitting
    * the results indefinitely.
    */
  def repeatEval[A](task: => A): Observable[A] =
    new builders.RepeatEvalObservable(task)

  /** Repeats the evaluation of given effectful value, emitting
    * the results indefinitely.
    */
  def repeatEvalF[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Observable[A] =
    repeat(()).mapEvalF(_ => fa)(F)

  /** Creates an Observable that emits items in the given range.
    *
    * @param from the range start
    * @param until the range end
    * @param step increment step, either positive or negative
    */
  def range(from: Long, until: Long, step: Long = 1L): Observable[Long] =
    new builders.RangeObservable(from, until, step)

  /** Given an initial state and a generator function that produces the
    * next state and the next element in the sequence, creates an
    * observable that keeps generating elements produced by our
    * generator function.
    */
  def fromStateAction[S, A](f: S => (A, S))(seed: => S): Observable[A] =
    new builders.StateActionObservable(seed, f)

  /** Given an initial state and a generator function that produces the
    * next state and the next element in the sequence, creates an
    * observable that keeps generating elements produced by our
    * generator function until `None` is returned.
    * @example {{{
    *  Observable.unfold(0)(i => if (i < 10) Some((i, i + 1)) else None).toListL
    *
    *  result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    *  }}}
    *
    *  @see [[paginate]] for a way to return one more value when generator returns `None`
    */
  def unfold[S, A](seed: => S)(f: S => Option[(A, S)]): Observable[A] =
    new UnfoldObservable(seed, f)

  /** Given an initial state and a generator function that produces the
    * next state and the next element in the sequence, creates an
    * observable that keeps generating elements produced by our
    * generator function until `None` is returned.
    * @example {{{
    *  Observable.unfoldEval(0)(i => if (i < 10) Task.now(Some((i, i + 1))) else Task.now(None)).toListL
    *
    *  result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    *  }}}
    *
    *  @see [[paginateEval]] for a way to return one more value when generator returns `None`
    */
  def unfoldEval[S, A](seed: => S)(f: S => Task[Option[(A, S)]]): Observable[A] =
    new UnfoldEvalObservable(seed, f)

  /** Similar to [[unfold]], but allows to take emission one step further.
    * @example {{{
    *  Observable.paginate(0)(i => if (i < 10) (i, Some(i + 1)) else (i, None)).toListL
    *
    *  result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    *  }}}
    */
  def paginate[S, A](seed: => S)(f: S => (A, Option[S])): Observable[A] =
    new PaginateObservable(seed, f)

  /** Similar to [[unfoldEval]], but allows to take emission one step further.
    * @example {{{
    *  Observable.paginateEval(0)(i => if (i < 10) Task.now((i, Some(i + 1))) else Task.now((i,None))).toListL
    *
    *  result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    *  }}}
    */
  def paginateEval[S, A](seed: => S)(f: S => Task[(A, Option[S])]): Observable[A] =
    new PaginateEvalObservable(seed, f)

  /** Version of [[unfoldEval]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * @see [[unfoldEval]] for a version specialized for
    *      [[monix.eval.Task Task]]
    */
  def unfoldEvalF[F[_], S, A](seed: => S)(f: S => F[Option[(A, S)]])(implicit F: TaskLike[F]): Observable[A] =
    unfoldEval(seed)(a => Task.from(f(a)))

  /** Given an initial state and a generator function that produces the
    * next state and the next element in the sequence, creates an
    * observable that keeps generating elements produced by our
    * generator function.
    */
  def fromAsyncStateAction[S, A](f: S => Task[(A, S)])(seed: => S): Observable[A] =
    new builders.AsyncStateActionObservable(seed, f)

  /** Version of [[fromAsyncStateAction]] that can work with generic
    * `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
    * conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    *
    * @see [[fromAsyncStateAction]] for a version specialized for
    *      [[monix.eval.Task Task]]
    */
  def fromAsyncStateActionF[F[_], S, A](f: S => F[(A, S)])(seed: => S)(implicit F: TaskLike[F]): Observable[A] =
    fromAsyncStateAction[S, A](a => Task.from(f(a)))(seed)

  /** Wraps this Observable into a `org.reactivestreams.Publisher`.
    * See the [[http://www.reactive-streams.org/ Reactive Streams]]
    * protocol that Monix implements.
    */
  def toReactive[A](source: Observable[A])(implicit s: Scheduler): RPublisher[A] =
    source.toReactivePublisher[A](s)

  /** Create an Observable that repeatedly emits the given `item`, until
    * the underlying Observer cancels.
    */
  def timerRepeated[A](initialDelay: FiniteDuration, period: FiniteDuration, unit: A): Observable[A] =
    new builders.RepeatedValueObservable[A](initialDelay, period, unit)

  /** Creates a new observable from two observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (3, 3), (4, 4)
    * </pre>
    *
    * See [[combineLatestMap2]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    */
  def zip2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)] =
    new builders.Zip2Observable[A1, A2, (A1, A2)](oa1, oa2)((a1, a2) => (a1, a2))

  /** Creates a new observable from two observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (3, 3), (4, 4)
    * </pre>
    *
    * See [[combineLatestMap2]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param f is the mapping function applied over the generated pairs
    */
  def zipMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) => R): Observable[R] =
    new builders.Zip2Observable[A1, A2, R](oa1, oa2)(f)

  /** Creates a new observable from three observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap3]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    */
  def zip3[A1, A2, A3](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3]): Observable[(A1, A2, A3)] =
    new builders.Zip3Observable(oa1, oa2, oa3)((a1, a2, a3) => (a1, a2, a3))

  /** Creates a new observable from three observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap3]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param f is the mapping function applied over the generated pairs
    */
  def zipMap3[A1, A2, A3, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3])(
    f: (A1, A2, A3) => R
  ): Observable[R] =
    new builders.Zip3Observable(oa1, oa2, oa3)(f)

  /** Creates a new observable from four observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap4]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    */
  def zip4[A1, A2, A3, A4](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4]
  ): Observable[(A1, A2, A3, A4)] =
    new builders.Zip4Observable(oa1, oa2, oa3, oa4)((a1, a2, a3, a4) => (a1, a2, a3, a4))

  /** Creates a new observable from four observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap4]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param f is the mapping function applied over the generated pairs
    */
  def zipMap4[A1, A2, A3, A4, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4])(
    f: (A1, A2, A3, A4) => R
  ): Observable[R] =
    new builders.Zip4Observable(oa1, oa2, oa3, oa4)(f)

  /** Creates a new observable from five observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap5]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    */
  def zip5[A1, A2, A3, A4, A5](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5]
  ): Observable[(A1, A2, A3, A4, A5)] =
    new builders.Zip5Observable(oa1, oa2, oa3, oa4, oa5)((a1, a2, a3, a4, a5) => (a1, a2, a3, a4, a5))

  /** Creates a new observable from five observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap5]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param f is the mapping function applied over the generated pairs
    */
  def zipMap5[A1, A2, A3, A4, A5, R](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5]
  )(f: (A1, A2, A3, A4, A5) => R): Observable[R] =
    new builders.Zip5Observable(oa1, oa2, oa3, oa4, oa5)(f)

  /** Creates a new observable from five observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap5]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    */
  def zip6[A1, A2, A3, A4, A5, A6](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5],
    oa6: Observable[A6]
  ): Observable[(A1, A2, A3, A4, A5, A6)] =
    new builders.Zip6Observable(oa1, oa2, oa3, oa4, oa5, oa6)((a1, a2, a3, a4, a5, a6) => (a1, a2, a3, a4, a5, a6))

  /** Creates a new observable from five observable sequences
    * by combining their items in pairs in a strict sequence.
    *
    * So the first item emitted by the new observable will be the result
    * of the function applied to the first items emitted by each of
    * the source observables; the second item emitted by the new observable
    * will be the result of the function applied to the second items
    * emitted by each of those observables; and so forth.
    *
    * See [[combineLatestMap5]] for a more relaxed alternative that doesn't
    * combine items in strict sequence.
    *
    * @param f is the mapping function applied over the generated pairs
    */
  def zipMap6[A1, A2, A3, A4, A5, A6, R](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5],
    oa6: Observable[A6]
  )(f: (A1, A2, A3, A4, A5, A6) => R): Observable[R] =
    new builders.Zip6Observable(oa1, oa2, oa3, oa4, oa5, oa6)(f)

  /** Given an observable sequence, it [[Observable!.zip zips]] them
    * together returning a new observable that generates sequences.
    */
  def zipList[A](sources: Observable[A]*): Observable[Seq[A]] = {
    if (sources.isEmpty) Observable.empty
    else {
      val seed = sources.head.map(t => Vector(t))
      sources.tail.foldLeft(seed) { (acc, obs) =>
        acc.zipMap(obs)((seq, elem) => seq :+ elem)
      }
    }
  }

  /** Creates an observable that doesn't emit anything, but immediately
    * calls `onComplete` instead.
    */
  def empty[A]: Observable[A] =
    builders.EmptyObservable

  /** Creates a `Observable` that depends on resource allocated by a
    * monadic value, ensuring the resource is released.
    *
    * Typical use-cases are working with files or network sockets
    *
    * ==Example==
    *
    * {{{
    *   import monix.eval.Task
    *   import java.io.PrintWriter
    *
    *   val printer =
    *     Observable.resource {
    *       Task(new PrintWriter("./lines.txt"))
    *     } { writer =>
    *       Task(writer.close())
    *     }
    *
    *   // Safely use the resource, because the release is
    *   // scheduled to happen afterwards
    *   val writeLines = printer.flatMap { writer =>
    *     Observable
    *       .fromIterator(Task(Iterator.from(1)))
    *       .mapEval(i => Task { writer.println(s"Line #\\$i") })
    *   }
    *
    *   // Write 100 numbered lines to the file, closing the writer
    *   // when finished (after `runAsync`):
    *   writeLines.take(100).completedL
    * }}}
    *
    * @param acquire resource to acquire at the start of the stream
    * @param release function that releases the acquired resource
    */
  def resource[A](acquire: Task[A])(release: A => Task[Unit]): Observable[A] =
    resourceCase(acquire)((a, _) => release(a))

  /** Version of [[resource]] that can work with generic `F[_]` tasks,
    * anything that's supported via [[monix.eval.TaskLike]] conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def resourceF[F[_], A](acquire: F[A])(release: A => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
    resource(F(acquire))(a => F(release(a)))

  /** Creates a stream that depends on resource allocated by a
    * monadic value, ensuring the resource is released.
    *
    * Typical use-cases are working with files or network sockets
    *
    * ==Example==
    *
    * {{{
    *   import cats.effect.ExitCase
    *   import monix.eval.Task
    *   import java.io.PrintWriter
    *
    *   val printer =
    *     Observable.resourceCase {
    *       Task(new PrintWriter("./lines.txt"))
    *     } {
    *       case (writer, ExitCase.Canceled | ExitCase.Completed) =>
    *         Task(writer.close())
    *       case (writer, ExitCase.Error(e)) =>
    *         Task { println(e.getMessage); writer.close() }
    *     }
    *
    *   // Safely use the resource, because the release is
    *   // scheduled to happen afterwards
    *   val writeLines = printer.flatMap { writer =>
    *     Observable
    *       .fromIterator(Task(Iterator.from(1)))
    *       .mapEval(i => Task { writer.println(s"Line #\\$i") })
    *   }
    *
    *   // Write 100 numbered lines to the file, closing the writer
    *   // when finished (after `runAsync`):
    *   writeLines.take(100).completedL
    * }}}
    *
    * @param acquire an effect that acquires an expensive resource
    * @param release function that releases the acquired resource
    */
  def resourceCase[A](acquire: Task[A])(release: (A, ExitCase[Throwable]) => Task[Unit]): Observable[A] =
    new ResourceCaseObservable(acquire, release)

  /** Version of [[resourceCase]] that can work with generic `F[_]` tasks,
    * anything that's supported via [[monix.eval.TaskLike]] conversions.
    *
    * So you can work among others with:
    *
    *  - `cats.effect.IO`
    *  - `monix.eval.Coeval`
    *  - `scala.concurrent.Future`
    *  - ...
    */
  def resourceCaseF[F[_], A](acquire: F[A])(release: (A, ExitCase[Throwable]) => F[Unit])(implicit
    F: TaskLike[F]
  ): Observable[A] =
    resourceCase(F(acquire))((a, e) => F(release(a, e)))

  /** Creates a combined observable from 2 source observables.
    *
    * This operator behaves in a similar way to [[zip2]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
    * </pre>
    */
  def combineLatest2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)] =
    new builders.CombineLatest2Observable[A1, A2, (A1, A2)](oa1, oa2)((a1, a2) => (a1, a2))

  /** Creates a combined observable from 2 source observables.
    *
    * This operator behaves in a similar way to [[zipMap2]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: 1 - - 2 - - 3 - 4 - -
    * stream2: 1 - - 2 - 3 - - - - 4
    *
    * result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
    * </pre>
    */
  def combineLatestMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) => R): Observable[R] =
    new builders.CombineLatest2Observable[A1, A2, R](oa1, oa2)(f)

  /** Creates a combined observable from 3 source observables.
    *
    * This operator behaves in a similar way to [[zip3]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatest3[A1, A2, A3](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3]
  ): Observable[(A1, A2, A3)] =
    new builders.CombineLatest3Observable(oa1, oa2, oa3)((a1, a2, a3) => (a1, a2, a3))

  /** Creates a combined observable from 3 source observables.
    *
    * This operator behaves in a similar way to [[zipMap3]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatestMap3[A1, A2, A3, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3])(
    f: (A1, A2, A3) => R
  ): Observable[R] =
    new builders.CombineLatest3Observable[A1, A2, A3, R](a1, a2, a3)(f)

  /** Creates a combined observable from 4 source observables.
    *
    * This operator behaves in a similar way to [[zip4]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatest4[A1, A2, A3, A4](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4]
  ): Observable[(A1, A2, A3, A4)] =
    new builders.CombineLatest4Observable(oa1, oa2, oa3, oa4)((a1, a2, a3, a4) => (a1, a2, a3, a4))

  /** Creates a combined observable from 4 source observables.
    *
    * This operator behaves in a similar way to [[zipMap4]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatestMap4[A1, A2, A3, A4, R](
    a1: Observable[A1],
    a2: Observable[A2],
    a3: Observable[A3],
    a4: Observable[A4]
  )(f: (A1, A2, A3, A4) => R): Observable[R] =
    new builders.CombineLatest4Observable[A1, A2, A3, A4, R](a1, a2, a3, a4)(f)

  /** Creates a combined observable from 5 source observables.
    *
    * This operator behaves in a similar way to [[zip5]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatest5[A1, A2, A3, A4, A5](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5]
  ): Observable[(A1, A2, A3, A4, A5)] =
    new builders.CombineLatest5Observable(oa1, oa2, oa3, oa4, oa5)((a1, a2, a3, a4, a5) => (a1, a2, a3, a4, a5))

  /** Creates a combined observable from 5 source observables.
    *
    * This operator behaves in a similar way to [[zipMap5]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatestMap5[A1, A2, A3, A4, A5, R](
    a1: Observable[A1],
    a2: Observable[A2],
    a3: Observable[A3],
    a4: Observable[A4],
    a5: Observable[A5]
  )(f: (A1, A2, A3, A4, A5) => R): Observable[R] =
    new builders.CombineLatest5Observable[A1, A2, A3, A4, A5, R](a1, a2, a3, a4, a5)(f)

  /** Creates a combined observable from 6 source observables.
    *
    * This operator behaves in a similar way to [[zip6]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatest6[A1, A2, A3, A4, A5, A6](
    oa1: Observable[A1],
    oa2: Observable[A2],
    oa3: Observable[A3],
    oa4: Observable[A4],
    oa5: Observable[A5],
    oa6: Observable[A6]
  ): Observable[(A1, A2, A3, A4, A5, A6)] =
    new builders.CombineLatest6Observable(oa1, oa2, oa3, oa4, oa5, oa6)((a1, a2, a3, a4, a5, a6) =>
      (a1, a2, a3, a4, a5, a6)
    )

  /** Creates a combined observable from 6 source observables.
    *
    * This operator behaves in a similar way to [[zipMap6]],
    * but while `zip` emits items only when all of the zipped source
    * observables have emitted a previously unzipped item, `combine`
    * emits an item whenever any of the source Observables emits an
    * item (so long as each of the source Observables has emitted at
    * least one item).
    */
  def combineLatestMap6[A1, A2, A3, A4, A5, A6, R](
    a1: Observable[A1],
    a2: Observable[A2],
    a3: Observable[A3],
    a4: Observable[A4],
    a5: Observable[A5],
    a6: Observable[A6]
  )(f: (A1, A2, A3, A4, A5, A6) => R): Observable[R] =
    new builders.CombineLatest6Observable[A1, A2, A3, A4, A5, A6, R](a1, a2, a3, a4, a5, a6)(f)

  /** Given an observable sequence, it combines them together
    * returning a new observable that generates sequences.
    */
  def combineLatestList[A](sources: Observable[A]*): Observable[Seq[A]] = {
    if (sources.isEmpty) {
      Observable.empty
    } else {
      new CombineLatestListObservable[A](sources)
    }
  }

  /** Given a sequence of priority/observable pairs, combines them into a new
    * observable that eagerly emits source items downstream as soon as demand is
    * signaled, choosing the item from the highest priority (greater numbers
    * mean higher priority) source when items from multiple sources are
    * available. If items are available from multiple sources with the same
    * highest priority, one of them is chosen arbitrarily.
    *
    * Source items are buffered only to the extent necessary to accommodate
    * backpressure from downstream, and thus if only a single item is available
    * when demand is signaled, it will be emitted regardless of priority.
    *
    * Backpressure is propagated from downstream to the source observables, so
    * that items from a given source will always be emitted downstream in the
    * same order as received from the source, and at most a single item from a
    * given source will be in flight at a time.
    */
  def mergePrioritizedList[A](sources: (Int, Observable[A])*): Observable[A] = {
    if (sources.isEmpty) {
      Observable.empty
    } else {
      new MergePrioritizedListObservable[A](sources)
    }
  }

  /** Given a list of source Observables, emits all of the items from
    * the first of these Observables to emit an item or to complete,
    * and cancel the rest.
    *
    * == Visual Example ==
    *
    * <pre>
    * stream1: - - 1 1 1 - 1 - 1 - -
    * stream2: - - - - - 2 2 2 2 2 2
    *
    * result: - - 1 1 1 - 1 - 1 - -
    * </pre>
    */
  def firstStartedOf[A](source: Observable[A]*): Observable[A] =
    new builders.FirstStartedObservable(source: _*)

  /** Implicit type class instances for [[Observable]]. */
  implicit val catsInstances: CatsInstances =
    new CatsInstances

  /** Cats instances for [[Observable]]. */
  class CatsInstances
    extends Bracket[Observable, Throwable] with Alternative[Observable] with CoflatMap[Observable]
    with FunctorFilter[Observable] with TaskLift[Observable] {

    override def unit: Observable[Unit] =
      Observable.unit
    override def pure[A](a: A): Observable[A] =
      Observable.now(a)
    override def combineK[A](x: Observable[A], y: Observable[A]): Observable[A] =
      x.appendAll(y)
    override def flatMap[A, B](fa: Observable[A])(f: (A) => Observable[B]): Observable[B] =
      fa.flatMap(f)
    override def flatten[A](ffa: Observable[Observable[A]]): Observable[A] =
      ffa.flatten
    override def tailRecM[A, B](a: A)(f: (A) => Observable[Either[A, B]]): Observable[B] =
      Observable.tailRecM(a)(f)
    override def coflatMap[A, B](fa: Observable[A])(f: (Observable[A]) => B): Observable[B] =
      Observable.eval(f(fa))
    override def ap[A, B](ff: Observable[(A) => B])(fa: Observable[A]): Observable[B] =
      for (f <- ff; a <- fa) yield f(a)
    override def map2[A, B, Z](fa: Observable[A], fb: Observable[B])(f: (A, B) => Z): Observable[Z] =
      for (a <- fa; b <- fb) yield f(a, b)
    override def map[A, B](fa: Observable[A])(f: (A) => B): Observable[B] =
      fa.map(f)
    override def raiseError[A](e: Throwable): Observable[A] =
      Observable.raiseError(e)
    override def handleError[A](fa: Observable[A])(f: (Throwable) => A): Observable[A] =
      fa.onErrorHandle(f)
    override def handleErrorWith[A](fa: Observable[A])(f: (Throwable) => Observable[A]): Observable[A] =
      fa.onErrorHandleWith(f)
    override def recover[A](fa: Observable[A])(pf: PartialFunction[Throwable, A]): Observable[A] =
      fa.onErrorRecover(pf)
    override def recoverWith[A](fa: Observable[A])(pf: PartialFunction[Throwable, Observable[A]]): Observable[A] =
      fa.onErrorRecoverWith(pf)
    override def empty[A]: Observable[A] =
      Observable.empty[A]
    override def apply[A](task: Task[A]): Observable[A] =
      Observable.fromTask(task)
    override def bracketCase[A, B](acquire: Observable[A])(use: A => Observable[B])(
      release: (A, ExitCase[Throwable]) => Observable[Unit]
    ): Observable[B] =
      acquire.bracketCase(use)((a, e) => release(a, e).completedL)
    override def bracket[A, B](acquire: Observable[A])(use: A => Observable[B])(
      release: A => Observable[Unit]
    ): Observable[B] =
      acquire.bracket(use)(release.andThen(_.completedL))
    override def guarantee[A](fa: Observable[A])(finalizer: Observable[Unit]): Observable[A] =
      fa.guarantee(finalizer.completedL)
    override def guaranteeCase[A](fa: Observable[A])(
      finalizer: ExitCase[Throwable] => Observable[Unit]
    ): Observable[A] =
      fa.guaranteeCase(e => finalizer(e).completedL)
    override def uncancelable[A](fa: Observable[A]): Observable[A] =
      fa.uncancelable
    override def functor: Functor[Observable] = this
    override def mapFilter[A, B](fa: Observable[A])(f: A => Option[B]): Observable[B] =
      fa.map(f).collect { case Some(b) => b }
    override def collect[A, B](fa: Observable[A])(f: PartialFunction[A, B]): Observable[B] =
      fa.collect(f)
    override def filter[A](fa: Observable[A])(f: A => Boolean): Observable[A] =
      fa.filter(f)
  }

  /** [[cats.NonEmptyParallel]] instance for [[Observable]]. */
  implicit val observableNonEmptyParallel: NonEmptyParallel.Aux[Observable, CombineObservable.Type] =
    new NonEmptyParallel[Observable] {
      import CombineObservable.unwrap
      import CombineObservable.{ apply => wrap }

      override type F[A] = CombineObservable.Type[A]

      override def flatMap: FlatMap[Observable] = implicitly[FlatMap[Observable]]
      override def apply: Apply[CombineObservable.Type] = CombineObservable.combineObservableApplicative

      override val sequential = new (CombineObservable.Type ~> Observable) {
        def apply[A](fa: CombineObservable.Type[A]): Observable[A] = unwrap(fa)
      }
      override val parallel = new (Observable ~> CombineObservable.Type) {
        def apply[A](fa: Observable[A]): CombineObservable.Type[A] = wrap(fa)
      }
    }

  /** Exposes extension methods for deprecated [[Observable]] methods.
    */
  implicit final class DeprecatedExtensions[+A](val self: Observable[A])
    extends AnyVal with ObservableDeprecatedMethods[A]
}
